Source code for dtaianomaly.anomaly_detection._BasePyODAnomalyDetector

import abc

import numpy as np
from pyod.models.base import BaseDetector as PyODBaseDetector

from dtaianomaly.anomaly_detection._BaseDetector import BaseDetector, Supervision
from dtaianomaly.type_validation import IntegerAttribute, WindowSizeAttribute
from dtaianomaly.windowing import (
    WINDOW_SIZE_TYPE,
    compute_window_size,
    reverse_sliding_window,
    sliding_window,
)

__all__ = ["BasePyODAnomalyDetector"]


[docs] class BasePyODAnomalyDetector(BaseDetector, abc.ABC): """ Abstract class for anomaly detection based on the PyOD library :cite:`zhao2019pyod`. PyOD is a Python library for detecting anomalies in multivariate data. The anomaly detectors in PyOD typically deal with tabular data, which assumes i.i.d (independent and identically distributed) data. This is generally not the case for time series data, which has a temporal dependency. Nevertheless, the detectors of PyOD can be used for detecting anomalies in time series data. Parameters ---------- window_size : int or str The window size to use for extracting sliding windows from the time series. This value will be passed to :py:meth:`~dtaianomaly.anomaly_detection.compute_window_size`. stride : int, default=1 The stride, i.e., the step size for extracting sliding windows from the time series. **kwargs Arguments to be passed to pyod anomaly detector. Attributes ---------- window_size_ : int The effectively used window size for this anomaly detector pyod_detector_ : PyODBaseDetector The PyOD anomaly detector """ window_size: WINDOW_SIZE_TYPE stride: int kwargs: dict window_size_: int pyod_detector_: PyODBaseDetector attribute_validation = { "window_size": WindowSizeAttribute(), "stride": IntegerAttribute(1), } def __init__(self, window_size: WINDOW_SIZE_TYPE, stride: int = 1, **kwargs): super().__init__(self._supervision()) self.window_size = window_size self.stride = stride self.kwargs = kwargs # Check if the PyOD detector can be correctly initialized self._initialize_detector(**self.kwargs) @abc.abstractmethod def _initialize_detector(self, **kwargs) -> PyODBaseDetector: """ Initialize the PyOD anomaly detector. Parameters ---------- kwargs: The hyperparameters to be passed to the PyOD anomaly detector. Returns ------- detector: PyODBaseDetector A PyOD anomaly detector with the given hyperparameters. """ @abc.abstractmethod def _supervision(self) -> Supervision: """ Return the supervision of this anomaly detector. Returns ------- supervision: Supervision The supervision of this PyOD anomaly detector. """ def _fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> None: self.window_size_ = compute_window_size(X, self.window_size, **kwargs) self.pyod_detector_ = self._initialize_detector(**self.kwargs) self.pyod_detector_.fit(sliding_window(X, self.window_size_, self.stride)) def _decision_function(self, X: np.ndarray) -> np.array: per_window_decision_scores = self.pyod_detector_.decision_function( sliding_window(X, self.window_size_, self.stride) ) decision_scores = reverse_sliding_window( per_window_decision_scores, self.window_size_, self.stride, X.shape[0] ) return decision_scores