HybridKNearestNeighbors

class dtaianomaly.anomaly_detection.HybridKNearestNeighbors(window_size: int | Literal['fft', 'acf', 'mwf', 'suss'], stride: int = 1, n_neighbors: int = 5, n_estimators: int = 3, max_samples: float | int = 'auto', metric: str = 'euclidean', hidden_layer_dimensions: list[int] = (64,), latent_space_dimension: int = 16, activation_function: Literal['linear', 'relu', 'sigmoid', 'tanh'] = 'relu', batch_size: int = 32, n_epochs: int = 5, learning_rate: float = 0.001, device: str = 'cpu', seed: int = None)[source]

Anomaly detection based on a hybrid K-NN with AutoEncoder embedding [33].

Combine an autoencoder model to learn a latent space representation of the subsequences with an ensemble of K-NN instances. At training, an autoencoder is fitted using subsequences from the training time series to embed them into a latent space. Then, the latent space embeddings are split into multiple subsets in a bagging-like manner. For each subset, a K-NN instance is initialized, and the average K-th nearest neighbor distance of each sample is computed across all the subsets. At prediction time, the autoencoder creates the latent space embedding of the sequences, and the average K-th nearest neighbor distance of each test-sequence across all subsets is computed. The anomaly score of a test-sequence in regard to a subset is then computed as the proportion of samples in the subset that have a smaller average distance. The final anomaly score equals the average anomaly score across all subsets.

Parameters:
window_sizeint or str

The window size to use for extracting sliding windows from the time series. This value will be passed to compute_window_size().

strideint, default=1

The stride, i.e., the step size for extracting sliding windows from the time series.

n_neighborsint, default=5

The number of neighbors to use for the nearest neighbor queries.

n_estimatorsint, default=100

The number of K-NN instance and consequently subsets to use.

max_samplesint or float, default=’auto’

The number of samples to draw for each subset:

  • if int: Draw at most max_samples samples.

  • if float: Draw at most max_samples percentage of the samples.

  • if 'auto': Set max_samples=n_windows/n_estimators.

metricstr, default=’euclidean’

Distance metric for distance computations. Any metric of scikit-learn and scipy.spatial.distance can be used.

hidden_layer_dimensionslist of ints, default=[64]

The number of neurons in each hidden layer of the encoder and decoder. The given list equals the ordered sequence of neurons in the encoder. The layers in the decoder has the same dimensions but mirrored.

latent_space_dimensionint default=16

The dimension of the latent space.

activation_function{“linear”, “relu”, “sigmoid”, “tanh”} default=”relu”

The activation function to use at the end of each layer.

batch_sizeint, default=32

The size of the batches to feed to the network.

n_epochsint, default=5

The number of epochs for which the neural network should be trained.

learning_ratefloat, default=1e-3

The learning rate to use for training the network.

devicestr, default=”cpu”

The device on which te neural network should be trained. For more information, see: https://docs.pytorch.org/docs/stable/tensor_attributes.html#torch-device.

seedint, default=None

The seed used for training the autoencoder and sampling the subsets.

Attributes:
window_size_int

The effectively used window size for this anomaly detector

nearest_neighbors_list[NearestNeighbors]

The scikit-learn nearest neighbor instances for each subset

g_list[np.ndarray]

For each subset, a vector containing the average distance of each sample within the subset to its K-th nearest neighbor across all other subsets.

auto_encoder_torch.nn.Module

The auto encoder used to embed the windows in the time series.

Notes

  • Song et al. [33] assigns a binary anomaly score for a test-sequence with regards to a subset by checking if the number of sequences within the subset that have a greater distance than the test-sequence exceeds some predefined threshold \(\alpha\) (Equation 10). We drop this part to reduce the number of parameters and to allow for a more fine-grained anomaly score computation.

  • Currently, a very simple feed-forward auto encoder is implemented. If you want to use a more advanced model, you can extend this class and overwrite the build_auto_encoder() which returns a torch.nn.Module with fit(windows) and a encode(windows) methods. The given windows are those computed by sliding_window().

Examples

>>> from dtaianomaly.anomaly_detection import HybridKNearestNeighbors
>>> from dtaianomaly.data import demonstration_time_series
>>> x, y = demonstration_time_series()
>>> hybrid_knn = HybridKNearestNeighbors(64, seed=0).fit(x)
>>> hybrid_knn.decision_function(x)
array([0.12284644, 0.38202247, 0.43220974, ..., 0.83470662, 0.81722846,
       0.85243446]...)
build_auto_encoder(windows: ndarray) Module[source]

Build an auto encoder.

Build an auto encoder module that takes as input the given windows and learns to reconstruct them.

Parameters:
windowsarray-like of shape (n_windows, n_attributes x window_size)

The windows that will be fed to the auto encoder.

Returns:
torch.nn.Module

Returns a torch neural network module which will take as input the windows and learns to reconstruct them. The torch module has a .fit(windows) method to learn the weights and a .encode(windows) method to transform the windows into a latent space embedding.

check_is_fitted() None

Raise an error if this object is not fitted.

Check whether this object is fitted, and if it is not fitted, an exception is thrown.

Raises:
NotFittedError

If this object is not fitted.

decision_function(X: ndarray) array

Compute anomaly scores.

Compute the anomaly scores for the given time series using this detector.

Parameters:
Xarray-like of shape (n_samples, n_attributes)

Input time series.

Returns:
array-like of shape (n_samples)

The computed anomaly scores.

fit(X: ndarray, y: ndarray = None, **kwargs) BaseDetector

Fit this detector.

Fit this detector to the given data.

Parameters:
Xarray-like of shape (n_samples, n_attributes)

Input time series.

yarray-like, default=None

Ground-truth information.

**kwargs

Additional parameters to be used to fit the anomaly detector.

Returns:
BaseDetector

Returns the instance itself.

is_fitted() bool

Check whether this object is fitted.

Check whether all the attributes of this object that end with an underscore (‘_’) has been initialized.

Returns:
bool

True if and only if all the attributes of this object ending with ‘_’ are initialized.

predict_confidence(X: ndarray, X_train: ndarray = None, contamination: float = 0.05, decision_scores_given: bool = False)

Predict the confidence of the anomaly scores on the test given test data [26].

This method implements ExCeeD (Example-wise Confidence of anomaly Detectors) to estimate the confidence. ExCeed transforms the predicted decision scores to probability estimates using a Bayesian approach, which enables to assign a confidence score to each prediction which captures the uncertainty of the anomaly detector in that prediction.

Parameters:
Xarray-like of shape (n_samples, n_attributes)

The test time series for which the confidence of anomaly scores should be predicted.

X_trainarray-like of shape (n_samples_train, n_attributes), default=None

The training time series, which can be used as reference. If X_train=None, the test set is used as reference set.

contaminationfloat, default=0.05

The (estimated) contamination rate for the data, i.e., the expected percentage of anomalies.

decision_scores_givenbool, default=False

Whether the given X and X_train represent time series data or decision scores. If decision_scores_given=False (default), then the given arrays are interpreted as time series. Otherwise, they are interpreted as decision scores, as computed by decision_function().

Returns:
array-like of shape (n_samples)

The confidence of this anomaly detector in each prediction in the given test time series.

predict_proba(X: ndarray) ndarray

Predict anomaly probabilities.

Estimate the probability of a sample of X being anomalous, based on the anomaly scores obtained from decision_function by rescaling them to the range of [0, 1] via min-max scaling.

Parameters:
Xarray-like of shape (n_samples, n_attributes)

Input time series.

Returns:
array-like of shape (n_samples)

1D array with the same length as X, with values in the interval [0, 1], in which a higher value implies that the instance is more likely to be anomalous.

Raises:
ValueError

If scores is not a valid array.

ValueError

If the prediction scores from ‘decision_function’ are constant, but not in the interval [0, 1], because these values can not unambiguously be transformed to an anomaly probability.

requires_fitting() bool

Check whether this object requires fitting.

Check whether any of the attributes of this object ends with an underscore (‘_’), which indicates that the attribute is set when the object is fitted. Note that this method does not check whether the object is fitted, i.e., whether the attributes have been set.

Returns:
bool

True if and only if this object has attributes that end with ‘_’.

save(path: str | Path) None

Save this detector.

Save detector to disk as a pickle file with extension .dtai. If the given path consists of multiple subdirectories, then the not existing subdirectories are created.

Parameters:
pathstr or Path

Location where to store the detector.