Source code for dtaianomaly.anomaly_detection._KMeansAnomalyDetector

import numpy as np
from sklearn.cluster import KMeans

from dtaianomaly.anomaly_detection._BaseDetector import BaseDetector, Supervision
from dtaianomaly.type_validation import IntegerAttribute, WindowSizeAttribute
from dtaianomaly.windowing import (
    WINDOW_SIZE_TYPE,
    compute_window_size,
    reverse_sliding_window,
    sliding_window,
)

__all__ = ["KMeansAnomalyDetector"]


[docs] class KMeansAnomalyDetector(BaseDetector): """ Use KMeans clustering to detect anomalies :cite:`yairi2001fault`. KMeans anomaly detector first clusters the data using the KMeasn clustering algorithm. Next, for new data, the corresponding cluster is predicted, and the distance to the cluster centroid is computed. This distance corresponds to the decision scores of this anomaly detector: if an instance is far from the centroid, it is more anomalous. The input of KMeans clustering is a sliding window. Parameters ---------- window_size : int or str The window size to use for extracting sliding windows from the time series. This value will be passed to :py:meth:`~dtaianomaly.anomaly_detection.compute_window_size`. stride : int, default=1 The stride, i.e., the step size for extracting sliding windows from the time series. n_clusters : int, default=8 The number of clusters to use for K-means clustering. **kwargs Arguments to be passed to KMeans clustering of scikit-learn anomaly detector. Attributes ---------- window_size_ : int The effectively used window size for this anomaly detector. k_means_ : KMeans The KMeans clustering algorithm from scikit-learn. Examples -------- >>> from dtaianomaly.anomaly_detection import KMeansAnomalyDetector >>> from dtaianomaly.data import demonstration_time_series >>> x, y = demonstration_time_series() >>> kmeans_ad = KMeansAnomalyDetector(10).fit(x) >>> kmeans_ad.decision_function(x) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE array([0.50321076, 0.5753145 , 0.61938076, ..., 0.29794485, 0.30720306, 0.29857479]...) """ window_size: WINDOW_SIZE_TYPE stride: int n_clusters: int kwargs: dict window_size_: int k_means_: KMeans attribute_validation = { "window_size": WindowSizeAttribute(), "stride": IntegerAttribute(1), "n_clusters": IntegerAttribute(2), } def __init__( self, window_size: WINDOW_SIZE_TYPE, stride: int = 1, n_clusters: int = 8, **kwargs, ): super().__init__(Supervision.UNSUPERVISED) self.window_size = window_size self.stride = stride self.n_clusters = n_clusters self.kwargs = kwargs KMeans(n_clusters=self.n_clusters, **self.kwargs) def _fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> None: self.window_size_ = compute_window_size(X, self.window_size, **kwargs) self.k_means_ = KMeans(n_clusters=self.n_clusters, **self.kwargs) self.k_means_.fit(sliding_window(X, self.window_size_, self.stride)) def _decision_function(self, X: np.ndarray) -> np.array: sliding_windows = sliding_window(X, self.window_size_, self.stride) clusters = self.k_means_.predict(sliding_windows) distance_to_cluster_centers = np.linalg.norm( sliding_windows - self.k_means_.cluster_centers_[clusters], axis=1 ) decision_scores = reverse_sliding_window( distance_to_cluster_centers, self.window_size_, self.stride, X.shape[0] ) return decision_scores