Source code for dtaianomaly.anomaly_detection._ROCKAD

import numpy as np
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import PowerTransformer
from sklearn.utils import resample
from sktime.transformations.panel.rocket import Rocket

from dtaianomaly.anomaly_detection._BaseDetector import BaseDetector, Supervision
from dtaianomaly.type_validation import (
    BoolAttribute,
    IntegerAttribute,
    NoneAttribute,
    WindowSizeAttribute,
)
from dtaianomaly.utils import get_dimension
from dtaianomaly.windowing import (
    WINDOW_SIZE_TYPE,
    compute_window_size,
    reverse_sliding_window,
)

__all__ = ["ROCKAD"]


[docs] class ROCKAD(BaseDetector): """ Detect anomalies in time series subsequences with ROCKAD :cite:`theissler2023rockad`. ROCKAD uses the ROCKET transformation :cite:`dempster2020rocket` as an unsupervised feature extractor from time series subsequences. Then, a bagging-based ensemble of k-NN models using the ROCKET-features is used to detect anomalous time series subsequences, in which the anomaly score of each individual instance is computed as the distance to the k-th nearest neighbor within each bagging subset. As discussed by :cite:t:`theissler2023rockad`, first applying a power-transform and then standard scaling the ROCKET features improves separation of the normal and anomalous sequences. Parameters ---------- window_size : int or str The window size, the length of the subsequences that will be detected as anomalies. This value will be passed to :py:meth:`~dtaianomaly.anomaly_detection.compute_window_size`. stride : int, default=1 The stride, i.e., the step size for extracting sliding windows from the time series. n_kernels : int, default=100, The number of kernels to use in the ROCKET-transformation. power_transform : bool, default=True Whether to perform a power-transformation or not. n_estimators : int, default=10 The number of k-NN estimators to include in the detection ensemble. n_neighbors : int, default=5 The number of neighbors to use for the nearest neighbor queries. metric : str, default='euclidean' Distance metric for distance computations. any metric of scikit-learn and scipy.spatial.distance can be used. n_jobs : int, default=1 The number of jobs to use, which is passed to the scikit-learn components. seed : int, default=None The random seed used to split the data and initialise the kernels. Attributes ---------- window_size_ : int The effectively used window size for detecting anomalies. rocket_ : Rocket The ``sktime`` Rocket transformer object. power_transformer_ : PowerTransformer The ``sklearn`` power transformer object. The object will only be fitted if ``power_transform=True``. nearest_neighbors_ : list of NearestNeighbors The fitted nearest neighbor instances on a different subset of the instances. Examples -------- >>> from dtaianomaly.anomaly_detection import ROCKAD >>> from dtaianomaly.data import demonstration_time_series >>> x, y = demonstration_time_series() >>> rockad = ROCKAD(64, seed=0).fit(x) >>> rockad.decision_function(x) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE, +SKIP array([5.30759668, 5.25451016, 4.80149563, ..., 3.40483896, 3.72443581, 3.74599171]) """ window_size: WINDOW_SIZE_TYPE stride: int n_kernels: int power_transform: bool n_estimators: int n_neighbors: int metric: str n_jobs: int seed: int | None window_size_: int rocket_: Rocket power_transformer_: PowerTransformer nearest_neighbors_: list[NearestNeighbors] attribute_validation = { "window_size": WindowSizeAttribute(), "stride": IntegerAttribute(1), "n_kernels": IntegerAttribute(1), "n_estimators": IntegerAttribute(1), "n_neighbors": IntegerAttribute(1), "n_jobs": IntegerAttribute(1), "power_transform": BoolAttribute(), "seed": IntegerAttribute() | NoneAttribute(), } def __init__( self, window_size: WINDOW_SIZE_TYPE, stride: int = 1, n_kernels: int = 100, power_transform: bool = True, n_estimators: int = 10, n_neighbors: int = 5, metric: str = "euclidean", n_jobs: int = 1, seed: int = None, ): super().__init__(Supervision.UNSUPERVISED) self.window_size = window_size self.stride = stride self.n_kernels = n_kernels self.n_estimators = n_estimators self.n_neighbors = n_neighbors self.metric = metric self.n_jobs = n_jobs self.power_transform = power_transform self.seed = seed def _fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> None: self.window_size_ = compute_window_size(X, self.window_size, **kwargs) windows = self._sliding_window(X) # Apply ROCKET self.rocket_ = Rocket( num_kernels=self.n_kernels, n_jobs=self.n_jobs, random_state=self.seed ) features = self.rocket_.fit_transform(windows) # Apply power-transformation self.power_transformer_ = PowerTransformer(standardize=True) if self.power_transform: features = self.power_transformer_.fit_transform(features) # Train the ensemble of nearest neighbor models. self.nearest_neighbors_ = [] for i in range(self.n_estimators): # Define a seed for this estimator seed_ = self.seed if self.seed is not None: seed_ += i # Initialize the NN object nearest_neighbors = NearestNeighbors( n_neighbors=self.n_neighbors, metric=self.metric, n_jobs=self.n_jobs ) # Sample a subset to bootstrap resample( features, replace=True, n_samples=None, random_state=seed_, stratify=None, ) # Fit the nearest neighbor instance on the sample nearest_neighbors.fit(features) self.nearest_neighbors_.append(nearest_neighbors) def _decision_function(self, X: np.ndarray) -> np.array: # Create the sliding windows windows = self._sliding_window(X) # Extract the ROCKET features features = self.rocket_.transform(windows) # Apply power transform if self.power_transform: features = self.power_transformer_.transform(features) # Compute the k-th nearest neighbor distance to each ensemble item nearest_neighbors_distances = np.empty( shape=(windows.shape[0], self.n_estimators) ) for i, nearest_neighbors in enumerate(self.nearest_neighbors_): nearest_neighbors_distances[:, i] = nearest_neighbors.kneighbors(features)[ 0 ][:, -1] # Aggregate the scores decision_scores = nearest_neighbors_distances.mean(axis=1) return reverse_sliding_window( decision_scores, self.window_size_, self.stride, X.shape[0] ) def _sliding_window(self, X: np.ndarray) -> np.ndarray: """Custom method to format the windows according to sktime format.""" X = X.reshape(X.shape[0], get_dimension(X)) windows = [ X[t : t + self.window_size_, :].T for t in range(0, X.shape[0] - self.window_size_, self.stride) ] windows.append(X[-self.window_size_ :].T) return np.array(windows)