Anomaly detection module

This module contains functionality to detect anomalies. It can be imported as follows:

>>> from dtaianomaly import anomaly_detection

We refer to the documentation for more information regarding detecting anomalies using dtaianomaly.

API cheatsheet

Below there is a quick overview of the most essential methods to detect anomalies:

  1. fit(). Fit the anomaly detector. This method requires both an X (the time series) and y (anomaly labels) parameter. However, in practice, most anomaly detectors will not use the ground truth labels. The parameter y is present for API consistency and is not required.

  2. decision_function(). Compute the decision scores of an observation being an anomaly for a given time series X. Returns an array with an entry for each observation in the time series. Note that this score is not normalized, and depends on the specific anomaly detector. However, for all detectors, a higher score means more anomalous.

  3. predict_proba(). Compute the probability of an anomaly being an anomaly. This is similar to the decision_function() method, but the computed scores are normalized to the interval \([0, 1]\), which enables the interpretation as a probability.

    Note

    The output of a predict_proba is often a matrix of size (n_samples, n_classes). For anomaly detection, this would lead to a matrix with two columns, one columns for the normal probabilities and one column for the anomalous probabilities. However, in dtaianomaly, the predict_proba() only returns the probability of a sample being anomalous, because this is the probability of interest in many anomaly detection applications.

Implemented anomaly detectors

class dtaianomaly.anomaly_detection.Supervision(*values)[source]

An enum for the different supervision types for anomaly detectors. Valid supervision types are:

  • Unsupervised: the anomaly detector does not need any labels or training data.

  • Semi-supervised: The anomaly detector requires normal training data, but no training labels.

  • Supervised: The anomaly detector requires both training data and training labels. The training data may contain anomalies.

BaseDetector

class dtaianomaly.anomaly_detection.BaseDetector(supervision: Supervision)[source]

Abstract base class for time series anomaly detection.

This base class defines method signatures to build specific anomaly detectors. User-defined detectors can be used throughout the dtaianomaly by extending this base class.

Parameters:

supervision (Supervision) – The type of supervision this anomaly detector requires.

check_is_fitted() None[source]

Check whether this anomaly detector is fitted or not.

Raises:

NotFittedError – If this detector is not fitted yet.

decision_function(X: ndarray) array[source]

Abstract method, compute anomaly scores.

Parameters:

X (array-like of shape (n_samples, n_attributes)) – Input time series.

Returns:

decision_scores – The computed anomaly scores.

Return type:

array-like of shape (n_samples)

fit(X: ndarray, y: ndarray = None, **kwargs) BaseDetector[source]

Abstract method, fit this detector to the given data.

Parameters:
  • X (array-like of shape (n_samples, n_attributes)) – Input time series.

  • y (array-like, default=None) – Ground-truth information.

Returns:

self – Returns the instance itself.

Return type:

BaseDetector

is_fitted() bool[source]

Return whether this anomaly detector is fitted.

Returns:

is_fitted – True if and only if this detector is fitted, and can be used for detecting anomalies.

Return type:

bool

predict_confidence(X: ndarray, X_train: ndarray = None, contamination: float = 0.05, decision_scores_given: bool = False)[source]

Predict the confidence of the anomaly scores on the test given test data.

This method implements ExCeeD [perini2020quantifying] (Example-wise Confidence of anomaly Detectors) to estimate the confidence. ExCeed transforms the predicted decision scores to probability estimates using a Bayesian approach, which enables to assign a confidence score to each prediction which captures the uncertainty of the anomaly detector in that prediction.

Parameters:
  • X (array-like of shape (n_samples, n_attributes)) – The test time series for which the confidence of anomaly scores should be predicted.

  • X_train (array-like of shape (n_samples_train, n_attributes), default=None) – The training time series, which can be used as reference. If X_train=None, the test set is used as reference set.

  • contamination (float, default=0.05) – The (estimated) contamination rate for the data, i.e., the expected percentage of anomalies.

  • decision_scores_given (bool, default=False) – Whether the given X and X_train represent time series data or decision scores. If decision_scores_given=False (default), then the given arrays are interpreted as time series. Otherwise, they are interpreted as decision scores, as computed by decision_function().

Returns:

confidence – The confidence of this anomaly detector in each prediction in the given test time series.

Return type:

array-like of shape (n_samples)

References

[perini2020quantifying]

Perini, L., Vercruyssen, V., Davis, J. Quantifying the Confidence of Anomaly Detectors in Their Example-Wise Predictions. In: Machine Learning and Knowledge Discovery in Databases. ECML PKDD 2020. Springer, Cham, doi: 10.1007/978-3-030-67664-3_14.

predict_proba(X: ndarray) ndarray[source]

Predict anomaly probabilities

Estimate the probability of a sample of X being anomalous, based on the anomaly scores obtained from decision_function by rescaling them to the range of [0, 1] via min-max scaling.

Parameters:

X (array-like of shape (n_samples, n_attributes)) – Input time series.

Returns:

anomaly_scores – 1D array with the same length as X, with values in the interval [0, 1], in which a higher value implies that the instance is more likely to be anomalous.

Return type:

array-like of shape (n_samples)

Raises:
  • ValueError – If scores is not a valid array.

  • ValueError – If the prediction scores from ‘decision_function’ are constant, but not in the interval [0, 1], because these values can not unambiguously be transformed to an anomaly probability.

save(path: str | Path) None[source]

Save detector to disk as a pickle file with extension .dtai. If the given path consists of multiple subdirectories, then the not existing subdirectories are created.

Parameters:

path (str or Path) – Location where to store the detector.

BaseNeuralDetector

class dtaianomaly.anomaly_detection.BaseNeuralDetector(supervision: Supervision, window_size: str | int, stride: int = 1, standard_scaling: bool = True, batch_size: int = 32, data_loader_kwargs: dict[str, any] = None, optimizer: Literal['adam', 'sgd'] | Callable[[any], Optimizer] = 'adam', learning_rate: float = 0.001, compile_model: bool = False, compile_mode: Literal['default', 'reduce-overhead', 'max-autotune', 'max-autotune-no-cudagraphs'] = 'default', n_epochs: int = 10, loss_function: Module = MSELoss(), device: str = 'cpu', seed: int = None)[source]

Base class for neural anomaly detectors, based on PyTorch.

This class implements the main functionality for training a model and detecting anomalies, including building the data loader, building the optimizer, and implementing the main train and evaluation loops. Extensions of this class should also implement methods to build the data set, the neural architecture, and how to train and evaluate on a single batch.

Parameters:
  • supervision (Supervision) – The type of supervision this anomaly detector requires.

  • window_size (int or str) – The window size to use for extracting sliding windows from the time series. This value will be passed to compute_window_size().

  • stride (int, default=1) – The stride, i.e., the step size for extracting sliding windows from the time series.

  • standard_scaling (bool, default=True) – Whether to standard scale each window independently, before feeding it to the network.

  • batch_size (int, default=32) – The size of the batches to feed to the network.

  • data_loader_kwargs (dictionary, default=None) – Additional kwargs to be passed to the data loader. For more information, see: https://docs.pytorch.org/docs/stable/data.html

  • optimizer ({"adam", "sgd"} or callable default="adam") – The optimizer to use for learning the weights. If “adam” is given, then the torch.optim.Adam optimizer will be used. If “sgd” is given, then the torch.optim.SGD optimizer will be used. Otherwise, a callable should be given, which takes as input the network parameters, and then creates an optimizer.

  • learning_rate (float, default=1e-3) – The learning rate to use for training the network. Has no effect if optimize is a callable.

  • compile_model (bool, default=False) – Whether the network architecture should be compiled or not before training the weights. For more information, see: https://docs.pytorch.org/docs/stable/generated/torch.compile.html

  • compile_mode ({"default", "reduce-overhead", "max-autotune", "max-autotune-no-cudagraphs"}, default="default") – Method to compile the architecture. For more information, see: https://docs.pytorch.org/docs/stable/generated/torch.compile.html

  • n_epochs (int, default=10) – The number of epochs for which the neural network should be trained.

  • loss_function (torch.nn.Module, default=torch.nn.MSELoss()) – The loss function to use for updating the weights.

  • device (str, default="cpu") – The device on which te neural network should be trained. For more information, see: https://docs.pytorch.org/docs/stable/tensor_attributes.html#torch-device

  • seed (int, default=None) – The seed used for training the model. This seed will update the torch and numpy seed at the beginning of the fit method.

window_size_

The effectively used window size for this anomaly detector.

Type:

int

optimizer_

The optimizer used for learning the weights of the network.

Type:

torch.optim.Optimizer

neural_network_

The PyTorch network architecture.

Type:

torch.nn.Module

See also

BaseNeuralForecastingDetector

Use a neural network to forecast the time series, and detect anomalies by measuring the difference with the actual observations.

BaseNeuralReconstructionDetector

Use a neural network to reconstruct windows in the time series, and detect anomalies as windows that are incorrectly reconstructed.

class dtaianomaly.anomaly_detection.BaseNeuralForecastingDetector(window_size: str | int, supervision: Supervision = Supervision.SEMI_SUPERVISED, error_metric: Literal['mean-absolute-error', 'mean-squared-error'] = 'mean-absolute-error', forecast_length: int = 1, stride: int = 1, standard_scaling: bool = True, batch_size: int = 32, data_loader_kwargs: dict[str, any] = None, optimizer: Literal['adam', 'sgd'] | Callable[[any], Optimizer] = 'adam', learning_rate: float = 0.001, compile_model: bool = False, compile_mode: Literal['default', 'reduce-overhead', 'max-autotune', 'max-autotune-no-cudagraphs'] = 'default', n_epochs: int = 10, loss_function: Module = MSELoss(), device: str = 'cpu', seed: int = None)[source]

Base class for forecasting-based neural anomaly detectors.

Forecasting-based anomaly detection detect anomalies by measuring the difference of a forecasted value with the actually observed value. Specifically, the neural network takes as input a sliding window of the time series, and aims at predicting the future values. The assumption is that anomalies are much harder to forecast. Thus, the difference between the forecasted value and the observed value will be high for anomalies, but low for normal observations.

Parameters:
  • window_size (int or str) – The window size to use for extracting sliding windows from the time series. This value will be passed to compute_window_size().

  • supervision (Supervision, default=Supervision.SEMI_SUPERVISED) – The type of supervision this anomaly detector requires.

  • error_metric ({"mean-absolute-error", "mean-squared-error"}, default="mean-absolute-error") – The error measure between the forecasted value and the observed values.

  • forecast_length (int default=1) – The number of time steps the neural network must forecast.

  • stride (int, default=1) – The stride, i.e., the step size for extracting sliding windows from the time series.

  • standard_scaling (bool, default=True) – Whether to standard scale each window independently, before feeding it to the network.

  • batch_size (int, default=32) – The size of the batches to feed to the network.

  • data_loader_kwargs (dictionary, default=None) – Additional kwargs to be passed to the data loader. For more information, see: https://docs.pytorch.org/docs/stable/data.html

  • optimizer ({"adam", "sgd"} or callable default="adam") – The optimizer to use for learning the weights. If “adam” is given, then the torch.optim.Adam optimizer will be used. If “sgd” is given, then the torch.optim.SGD optimizer will be used. Otherwise, a callable should be given, which takes as input the network parameters, and then creates an optimizer.

  • learning_rate (float, default=1e-3) – The learning rate to use for training the network. Has no effect if optimize is a callable.

  • compile_model (bool, default=False) – Whether the network architecture should be compiled or not before training the weights. For more information, see: https://docs.pytorch.org/docs/stable/generated/torch.compile.html

  • compile_mode ({"default", "reduce-overhead", "max-autotune", "max-autotune-no-cudagraphs"}, default="default") – Method to compile the architecture. For more information, see: https://docs.pytorch.org/docs/stable/generated/torch.compile.html

  • n_epochs (int, default=10) – The number of epochs for which the neural network should be trained.

  • loss_function (torch.nn.Module, default=torch.nn.MSELoss()) – The loss function to use for updating the weights.

  • device (str, default="cpu") – The device on which te neural network should be trained. For more information, see: https://docs.pytorch.org/docs/stable/tensor_attributes.html#torch-device

  • seed (int, default=None) – The seed used for training the model. This seed will update the torch and numpy seed at the beginning of the fit method.

window_size_

The effectively used window size for this anomaly detector.

Type:

int

optimizer_

The optimizer used for learning the weights of the network.

Type:

torch.optim.Optimizer

neural_network_

The PyTorch network architecture.

Type:

torch.nn.Module

See also

MultilayerPerceptron

An implementation of this class using an feed-forward neural network.

class dtaianomaly.anomaly_detection.BaseNeuralReconstructionDetector(window_size: str | int, supervision: Supervision = Supervision.SEMI_SUPERVISED, error_metric: Literal['mean-absolute-error', 'mean-squared-error'] = 'mean-absolute-error', stride: int = 1, standard_scaling: bool = True, batch_size: int = 32, data_loader_kwargs: dict[str, any] = None, optimizer: Literal['adam', 'sgd'] | Callable[[any], Optimizer] = 'adam', learning_rate: float = 0.001, compile_model: bool = False, compile_mode: Literal['default', 'reduce-overhead', 'max-autotune', 'max-autotune-no-cudagraphs'] = 'default', n_epochs: int = 10, loss_function: Module = MSELoss(), device: str = 'cpu', seed: int = None)[source]

Base class for reconstruction-based neural anomaly detectors.

Reconstruction-based anomaly detection detect anomalies by learning to reconstruct the data. Specifically, the neural network takes as input a sliding window of the time series, and learns to output the exactly same data. Given a normal time series enable to learn the normal behaviors, and as a consequence it is possible to accurately reconstruct the data. However, anomalous subsequences, which were not seen during training, can not be accurately reconstructed, and will have a larger reconstruction error as a consequence.

Parameters:
  • window_size (int or str) – The window size to use for extracting sliding windows from the time series. This value will be passed to compute_window_size().

  • supervision (Supervision, default=Supervision.SEMI_SUPERVISED) – The type of supervision this anomaly detector requires.

  • error_metric ({"mean-absolute-error", "mean-squared-error"}, default="mean-absolute-error") – The error measure between the reconstructed window and the original window.

  • stride (int, default=1) – The stride, i.e., the step size for extracting sliding windows from the time series.

  • standard_scaling (bool, default=True) – Whether to standard scale each window independently, before feeding it to the network.

  • batch_size (int, default=32) – The size of the batches to feed to the network.

  • data_loader_kwargs (dictionary, default=None) – Additional kwargs to be passed to the data loader. For more information, see: https://docs.pytorch.org/docs/stable/data.html

  • optimizer ({"adam", "sgd"} or callable default="adam") – The optimizer to use for learning the weights. If “adam” is given, then the torch.optim.Adam optimizer will be used. If “sgd” is given, then the torch.optim.SGD optimizer will be used. Otherwise, a callable should be given, which takes as input the network parameters, and then creates an optimizer.

  • learning_rate (float, default=1e-3) – The learning rate to use for training the network. Has no effect if optimize is a callable.

  • compile_model (bool, default=False) – Whether the network architecture should be compiled or not before training the weights. For more information, see: https://docs.pytorch.org/docs/stable/generated/torch.compile.html

  • compile_mode ({"default", "reduce-overhead", "max-autotune", "max-autotune-no-cudagraphs"}, default="default") – Method to compile the architecture. For more information, see: https://docs.pytorch.org/docs/stable/generated/torch.compile.html

  • n_epochs (int, default=10) – The number of epochs for which the neural network should be trained.

  • loss_function (torch.nn.Module, default=torch.nn.MSELoss()) – The loss function to use for updating the weights.

  • device (str, default="cpu") – The device on which te neural network should be trained. For more information, see: https://docs.pytorch.org/docs/stable/tensor_attributes.html#torch-device

  • seed (int, default=None) – The seed used for training the model. This seed will update the torch and numpy seed at the beginning of the fit method.

window_size_

The effectively used window size for this anomaly detector.

Type:

int

optimizer_

The optimizer used for learning the weights of the network.

Type:

torch.optim.Optimizer

neural_network_

The PyTorch network architecture.

Type:

torch.nn.Module

See also

AutoEncoder

An implementation of this class using an feed-forward auto encoder.

Utilities

dtaianomaly.anomaly_detection.load_detector(path: str | Path) BaseDetector[source]

Load a detector from disk.

Warning: method relies on pickle. Only load trusted files!

Parameters:

path (str or Path) – Location of the stored detector.

Returns:

detector – The loaded detector.

Return type:

BaseDetector

dtaianomaly.anomaly_detection.sliding_window(X: ndarray, window_size: int, stride: int) ndarray[source]

Constructs a sliding window for the given time series.

Parameters:
  • X (array-like of shape (n_samples, n_attributes)) – The time series

  • window_size (int) – The window size for the sliding windows.

  • stride (int) – The stride, i.e., the step size for the windows.

Returns:

windows – The windows as a 2D numpy array. Each row corresponds to a window. For windows of multivariate time series are flattened to form a 1D array of length the number of attributes multiplied by the window size.

Return type:

np.ndarray of shape ((n_samples - window_size)/stride + 1, n_attributes * window_size)

dtaianomaly.anomaly_detection.reverse_sliding_window(per_window_anomaly_scores: ndarray, window_size: int, stride: int, length_time_series: int) ndarray[source]

Reverses the sliding window, to convert the per-window anomaly scores into per-observation anomaly scores.

For non-overlapping sliding windows, it is trivial to convert the per-window anomaly scores to per-observation scores, because each observation is linked to only one window. For overlapping windows, certain observations are linked to one or more windows (depending on the window size and stride), obstructing simply copying the corresponding per-window anomaly score to each window. In the case of multiple overlapping windows, the anomaly score of the observation is set to the mean of the corresponding per-window anomaly scores.

Parameters:
  • per_window_anomaly_scores (array-like of shape (n_windows))

  • window_size (int) – The window size used for creating windows

  • stride (int) – The stride, i.e., the step size used for creating windows

  • length_time_series (int) – The original length of the time series.

Returns:

anomaly_scores – The per-observation anomaly scores.

Return type:

np.ndarray of shape (length_time_series)

dtaianomaly.anomaly_detection.check_is_valid_window_size(window_size: int | str) None[source]

Checks if the given window size is valid or not. If the window size is not valid, a ValueError will be raised. Valid window sizes include:

  • a strictly positive integer

  • a string from the set {'fft', 'acf', 'mwf', 'suss'}

Parameters:

window_size (int or string) – The valid to check if it is valid or not.

Raises:

ValueError – If the given window_size is not a valid window size.

dtaianomaly.anomaly_detection.compute_window_size(X: ndarray, window_size: int | str, lower_bound: int = 10, relative_lower_bound: float = 0.0, upper_bound: int = 1000, relative_upper_bound: float = 1.0, threshold: float = 0.89, default_window_size: int = None) int[source]

Compute the window size of the given time series [9].

Parameters:
  • X (array-like of shape (n_samples, n_attributes)) – Input time series.

  • window_size (int or str) –

    The method by which a window size should be computed. Valid options are:

    • int: Simply return the given window size.

    • 'fft': Compute the window size by selecting the dominant Fourier frequency.

    • 'acf': Compute the window size as the leg with the highest autocorrelation.

    • 'mwf': Computes the window size using the Multi-Window-Finder method [14].

    • 'suss': Computes the window size using the Summary Statistics Subsequence method [8].

  • lower_bound (int, default=10) – The lower bound on the automatically computed window size. Only used if window_size equals 'fft', 'acf', 'mwf' or 'suss'.

  • relative_lower_bound (float, default=0.0) – The lower bound on the automatically computed window size, relative to the length of the given time series. Only used if window_size equals 'fft', 'acf', 'mwf' or 'suss'.

  • upper_bound (int, default=1000) – The lower bound on the automatically computed window size. Only used if window_size equals 'fft', 'acf', or 'mwf'.

  • relative_upper_bound (float, default=1.0) – The upper bound on the automatically computed window size, relative to the length of the given time series. Only used if window_size equals 'fft', 'acf', or 'mwf'.

  • threshold (float, default=0.89) – The threshold for selecting the optimal window size using 'suss'.

  • default_window_size (int, default=None) – The default window size, in case an invalid automatic window size was computed. By default, the value is set to None, which means that an error is thrown.

Returns:

window_size_ – The computed window size.

Return type:

int