Anomaly detection module

This module contains functionality to detect anomalies. It can be imported as follows:

>>> from dtaianomaly import anomaly_detection

We refer to the documentation for more information regarding detecting anomalies using dtaianomaly.

API cheatsheet

Below there is a quick overview of the most essential methods to detect anomalies:

  1. fit(). Fit the anomaly detector. This method requires both an X (the time series) and y (anomaly labels) parameter. However, in practice, most anomaly detectors will not use the ground truth labels. The parameter y is present for API consistency and is not required.

  2. decision_function(). Compute the decision scores of an observation being an anomaly for a given time series X. Returns an array with an entry for each observation in the time series. Note that this score is not normalized, and depends on the specific anomaly detector. However, for all detectors, a higher score means more anomalous.

  3. predict_proba(). Compute the probability of an anomaly being an anomaly. This is similar to the decision_function() method, but the computed scores are normalized to the interval \([0, 1]\), which enables the interpretation as a probability.

    Note

    The output of a predict_proba is often a matrix of size (n_samples, n_classes). For anomaly detection, this would lead to a matrix with two columns, one columns for the normal probabilities and one column for the anomalous probabilities. However, in dtaianomaly, the predict_proba() only returns the probability of a sample being anomalous, because this is the probability of interest in many anomaly detection applications.

Implemented anomaly detectors

BaseDetector

class dtaianomaly.anomaly_detection.BaseDetector(supervision: Supervision)[source]

Abstract base class for time series anomaly detection.

This base class defines method signatures to build specific anomaly detectors. User-defined detectors can be used throughout the dtaianomaly by extending this base class.

Parameters:

supervision (Supervision) – The type of supervision this anomaly detector requires.

check_is_fitted() None[source]

Check whether this anomaly detector is fitted or not.

Raises:

NotFittedError – If this detector is not fitted yet.

decision_function(X: ndarray) array[source]

Abstract method, compute anomaly scores.

Parameters:

X (array-like of shape (n_samples, n_attributes)) – Input time series.

Returns:

decision_scores – The computed anomaly scores.

Return type:

array-like of shape (n_samples)

fit(X: ndarray, y: ndarray | None = None, **kwargs) BaseDetector[source]

Abstract method, fit this detector to the given data.

Parameters:
  • X (array-like of shape (n_samples, n_attributes)) – Input time series.

  • y (array-like, default=None) – Ground-truth information.

Returns:

self – Returns the instance itself.

Return type:

BaseDetector

is_fitted() bool[source]

Return whether this anomaly detector is fitted.

Returns:

is_fitted – True if and only if this detector is fitted, and can be used for detecting anomalies.

Return type:

bool

predict_confidence(X: ndarray, X_train: ndarray = None, contamination: float = 0.05, decision_scores_given: bool = False)[source]

Predict the confidence of the anomaly scores on the test given test data.

This method implements ExCeeD [perini2020quantifying] (Example-wise Confidence of anomaly Detectors) to estimate the confidence. ExCeed transforms the predicted decision scores to probability estimates using a Bayesian approach, which enables to assign a confidence score to each prediction which captures the uncertainty of the anomaly detector in that prediction.

Parameters:
  • X (array-like of shape (n_samples, n_attributes)) – The test time series for which the confidence of anomaly scores should be predicted.

  • X_train (array-like of shape (n_samples_train, n_attributes), default=None) – The training time series, which can be used as reference. If X_train=None, the test set is used as reference set.

  • contamination (float, default=0.05) – The (estimated) contamination rate for the data, i.e., the expected percentage of anomalies.

  • decision_scores_given (bool, default=False) – Whether the given X and X_train represent time series data or decision scores. If decision_scores_given=False (default), then the given arrays are interpreted as time series. Otherwise, they are interpreted as decision scores, as computed by decision_function().

Returns:

confidence – The confidence of this anomaly detector in each prediction in the given test time series.

Return type:

array-like of shape (n_samples)

References

[perini2020quantifying]

Perini, L., Vercruyssen, V., Davis, J. Quantifying the Confidence of Anomaly Detectors in Their Example-Wise Predictions. In: Machine Learning and Knowledge Discovery in Databases. ECML PKDD 2020. Springer, Cham, doi: 10.1007/978-3-030-67664-3_14.

predict_proba(X: ndarray) ndarray[source]

Predict anomaly probabilities

Estimate the probability of a sample of X being anomalous, based on the anomaly scores obtained from decision_function by rescaling them to the range of [0, 1] via min-max scaling.

Parameters:

X (array-like of shape (n_samples, n_attributes)) – Input time series.

Returns:

anomaly_scores – 1D array with the same length as X, with values in the interval [0, 1], in which a higher value implies that the instance is more likely to be anomalous.

Return type:

array-like of shape (n_samples)

Raises:
  • ValueError – If scores is not a valid array.

  • ValueError – If the prediction scores from ‘decision_function’ are constant, but not in the interval [0, 1], because these values can not unambiguously be transformed to an anomaly probability.

save(path: str | Path) None[source]

Save detector to disk as a pickle file with extension .dtai. If the given path consists of multiple subdirectories, then the not existing subdirectories are created.

Parameters:

path (str or Path) – Location where to store the detector.

class dtaianomaly.anomaly_detection.Supervision(*values)[source]

An enum for the different supervision types for anomaly detectors. Valid supervision types are:

  • Unsupervised: the anomaly detector does not need any labels or training data.

  • Semi-supervised: The anomaly detector requires normal training data, but no training labels.

  • Supervised: The anomaly detector requires both training data and training labels. The training data may contain anomalies.

Utilities

dtaianomaly.anomaly_detection.load_detector(path: str | Path) BaseDetector[source]

Load a detector from disk.

Warning: method relies on pickle. Only load trusted files!

Parameters:

path (str or Path) – Location of the stored detector.

Returns:

detector – The loaded detector.

Return type:

BaseDetector

dtaianomaly.anomaly_detection.sliding_window(X: ndarray, window_size: int, stride: int) ndarray[source]

Constructs a sliding window for the given time series.

Parameters:
  • X (array-like of shape (n_samples, n_attributes)) – The time series

  • window_size (int) – The window size for the sliding windows.

  • stride (int) – The stride, i.e., the step size for the windows.

Returns:

windows – The windows as a 2D numpy array. Each row corresponds to a window. For windows of multivariate time series are flattened to form a 1D array of length the number of attributes multiplied by the window size.

Return type:

np.ndarray of shape ((n_samples - window_size)/stride + 1, n_attributes * window_size)

dtaianomaly.anomaly_detection.reverse_sliding_window(per_window_anomaly_scores: ndarray, window_size: int, stride: int, length_time_series: int) ndarray[source]

Reverses the sliding window, to convert the per-window anomaly scores into per-observation anomaly scores.

For non-overlapping sliding windows, it is trivial to convert the per-window anomaly scores to per-observation scores, because each observation is linked to only one window. For overlapping windows, certain observations are linked to one or more windows (depending on the window size and stride), obstructing simply copying the corresponding per-window anomaly score to each window. In the case of multiple overlapping windows, the anomaly score of the observation is set to the mean of the corresponding per-window anomaly scores.

Parameters:
  • per_window_anomaly_scores (array-like of shape (n_windows))

  • window_size (int) – The window size used for creating windows

  • stride (int) – The stride, i.e., the step size used for creating windows

  • length_time_series (int) – The original length of the time series.

Returns:

anomaly_scores – The per-observation anomaly scores.

Return type:

np.ndarray of shape (length_time_series)

dtaianomaly.anomaly_detection.check_is_valid_window_size(window_size: int | str) None[source]

Checks if the given window size is valid or not. If the window size is not valid, a ValueError will be raised. Valid window sizes include:

  • a strictly positive integer

  • a string from the set {'fft', 'acf', 'mwf', 'suss'}

Parameters:

window_size (int or string) – The valid to check if it is valid or not.

Raises:

ValueError – If the given window_size is not a valid window size.

dtaianomaly.anomaly_detection.compute_window_size(X: ndarray, window_size: int | str, lower_bound: int = 10, relative_lower_bound: float = 0.0, upper_bound: int = 1000, relative_upper_bound: float = 1.0, threshold: float = 0.89, default_window_size: int = None) int[source]

Compute the window size of the given time series [ermshaus2023window].

Parameters:
  • X (array-like of shape (n_samples, n_attributes)) – Input time series.

  • window_size (int or str) –

    The method by which a window size should be computed. Valid options are:

    • int: Simply return the given window size.

    • 'fft': Compute the window size by selecting the dominant Fourier frequency.

    • 'acf': Compute the window size as the leg with the highest autocorrelation.

    • 'mwf': Computes the window size using the Multi-Window-Finder method [shima2021multi].

    • 'suss': Computes the window size using the Summary Statistics Subsequence method [ermshaus2023clasp].

  • lower_bound (int, default=10) – The lower bound on the automatically computed window size. Only used if window_size equals 'fft', 'acf', 'mwf' or 'suss'.

  • relative_lower_bound (float, default=0.0) – The lower bound on the automatically computed window size, relative to the length of the given time series. Only used if window_size equals 'fft', 'acf', 'mwf' or 'suss'.

  • upper_bound (int, default=1000) – The lower bound on the automatically computed window size. Only used if window_size equals 'fft', 'acf', or 'mwf'.

  • relative_upper_bound (float, default=1.0) – The upper bound on the automatically computed window size, relative to the length of the given time series. Only used if window_size equals 'fft', 'acf', or 'mwf'.

  • threshold (float, default=0.89) – The threshold for selecting the optimal window size using 'suss'.

  • default_window_size (int, default=None) – The default window size, in case an invalid automatic window size was computed. By default, the value is set to None, which means that an error is thrown.

Returns:

window_size_ – The computed window size.

Return type:

int

References

[ermshaus2023window]

Ermshaus, Arik, Patrick Schäfer, and Ulf Leser. “Window size selection in unsupervised time series analytics: A review and benchmark.” International Workshop on Advanced Analytics and Learning on Temporal Data. Springer, Cham, 2023, doi: 10.1007/978-3-031-24378-3_6

[shima2021multi]

Imani, Shima, and Eamonn Keogh. “Multi-window-finder: domain agnostic window size for time series data.” Proceedings of the MileTS 21 (2021).

[ermshaus2023clasp]

Ermshaus, Arik, Patrick Schäfer, and Ulf Leser. “ClaSP: parameter-free time series segmentation.” Data Mining and Knowledge Discovery 37.3 (2023): 1262-1300, doi: 10.1007/s10618-023-00923-x