KShape Anomaly Detector

class dtaianomaly.anomaly_detection.KShapeAnomalyDetector(window_size: str | int, n_clusters: int = 3, sequence_length_multiplier: float = 4, overlap_rate: float = 0.5, **kwargs)[source]

Anomaly detector based on KShape-clustering [20].

Use the KShapeAD algorithm to detect anomalies in time series. The subsequences are first clustered using KShape-clustering, in which the clusters represent the different normal behaviors in the data. For each cluster there is also a weight computed based on the size of the cluster and the centrality of that cluster in comparison to the other clusters. Anomalies are then detected by computing a weighted average of the distance of a subsequence to each other cluster. KShapeAD equals the offline version of SAND [3].

Parameters:
  • window_size (int or str) – The window size, the length of the subsequences that will be detected as anomalies. This value will be passed to compute_window_size().

  • n_clusters (int, default=3) – The number of clusters to use for KShape clustering.

  • sequence_length_multiplier (float, default=4) – The amount by which the window size should be multiplied to create sliding windows for clustering the data using KShape. Should be at least 1, to make sure that the cluster-centroids are larger than the sequences to detect anomalies in.

  • overlap_rate (float, default=0.5) – The overlap of the sliding windows for clustering the data. Will be used to compute a relative stride to avoid trivial matches when clustering subsequences.

  • **kwargs – Arguments to be passed to KShape-clustering of tsslearn.

window_size_

The effectively used window size for computing the matrix profile

Type:

int

centroids_

The centroids computed by KShape clustering.

Type:

list of array-like of shape (window_size_*sequence_length_multiplier,)

weights_

The normalized weights corresponding to each cluster.

Type:

list of float

kshape_

The fitted KShape-object of tslearn, used to cluster the data.

Type:

KShape

Examples

>>> from dtaianomaly.anomaly_detection import KShapeAnomalyDetector
>>> from dtaianomaly.data import demonstration_time_series
>>> x, y = demonstration_time_series()
>>> kshape = KShapeAnomalyDetector(window_size=50).fit(x)
>>> kshape.decision_function(x)
array([1.01942655, 1.03008335, 1.03906465, ..., 1.29643677, 1.3256903 , 1.34704128]...)

Notes

KshapeAD only handles univariate time series.

check_is_fitted() None

Check whether this anomaly detector is fitted or not.

Raises:

NotFittedError – If this detector is not fitted yet.

decision_function(X: ndarray) array

Abstract method, compute anomaly scores.

Parameters:

X (array-like of shape (n_samples, n_attributes)) – Input time series.

Returns:

decision_scores – The computed anomaly scores.

Return type:

array-like of shape (n_samples)

fit(X: ndarray, y: ndarray = None, **kwargs) BaseDetector

Abstract method, fit this detector to the given data.

Parameters:
  • X (array-like of shape (n_samples, n_attributes)) – Input time series.

  • y (array-like, default=None) – Ground-truth information.

Returns:

self – Returns the instance itself.

Return type:

BaseDetector

is_fitted() bool

Return whether this anomaly detector is fitted.

Returns:

is_fitted – True if and only if this detector is fitted, and can be used for detecting anomalies.

Return type:

bool

predict_confidence(X: ndarray, X_train: ndarray = None, contamination: float = 0.05, decision_scores_given: bool = False)

Predict the confidence of the anomaly scores on the test given test data.

This method implements ExCeeD [perini2020quantifying] (Example-wise Confidence of anomaly Detectors) to estimate the confidence. ExCeed transforms the predicted decision scores to probability estimates using a Bayesian approach, which enables to assign a confidence score to each prediction which captures the uncertainty of the anomaly detector in that prediction.

Parameters:
  • X (array-like of shape (n_samples, n_attributes)) – The test time series for which the confidence of anomaly scores should be predicted.

  • X_train (array-like of shape (n_samples_train, n_attributes), default=None) – The training time series, which can be used as reference. If X_train=None, the test set is used as reference set.

  • contamination (float, default=0.05) – The (estimated) contamination rate for the data, i.e., the expected percentage of anomalies.

  • decision_scores_given (bool, default=False) – Whether the given X and X_train represent time series data or decision scores. If decision_scores_given=False (default), then the given arrays are interpreted as time series. Otherwise, they are interpreted as decision scores, as computed by decision_function().

Returns:

confidence – The confidence of this anomaly detector in each prediction in the given test time series.

Return type:

array-like of shape (n_samples)

References

[perini2020quantifying]

Perini, L., Vercruyssen, V., Davis, J. Quantifying the Confidence of Anomaly Detectors in Their Example-Wise Predictions. In: Machine Learning and Knowledge Discovery in Databases. ECML PKDD 2020. Springer, Cham, doi: 10.1007/978-3-030-67664-3_14.

predict_proba(X: ndarray) ndarray

Predict anomaly probabilities

Estimate the probability of a sample of X being anomalous, based on the anomaly scores obtained from decision_function by rescaling them to the range of [0, 1] via min-max scaling.

Parameters:

X (array-like of shape (n_samples, n_attributes)) – Input time series.

Returns:

anomaly_scores – 1D array with the same length as X, with values in the interval [0, 1], in which a higher value implies that the instance is more likely to be anomalous.

Return type:

array-like of shape (n_samples)

Raises:
  • ValueError – If scores is not a valid array.

  • ValueError – If the prediction scores from ‘decision_function’ are constant, but not in the interval [0, 1], because these values can not unambiguously be transformed to an anomaly probability.

save(path: str | Path) None

Save detector to disk as a pickle file with extension .dtai. If the given path consists of multiple subdirectories, then the not existing subdirectories are created.

Parameters:

path (str or Path) – Location where to store the detector.

theta_() list[array, float][source]

Computes \(\Theta = \{(C_0, w_0), \dots, (C_k, w_k)\}\), the normal behavior consisting of \(k\) clusters.

Returns:

theta – A list of tuples in which the first element consists of the centroid corresponding to each cluster and the second element corresponds to the normalized weight of that cluster.

Return type:

list of tuples of array-likes of shape (window_size_*sequence_length_multiplier,) and floats