import abc
from typing import Optional, Union
import numpy as np
from pyod.models.base import BaseDetector as PyODBaseDetector
from dtaianomaly.anomaly_detection.BaseDetector import BaseDetector, Supervision
from dtaianomaly.anomaly_detection.windowing_utils import (
check_is_valid_window_size,
compute_window_size,
reverse_sliding_window,
sliding_window,
)
[docs]
class PyODAnomalyDetector(BaseDetector, abc.ABC):
"""
Abstract class for anomaly detection based on the PyOD library.
PyOD [zhao2019pyod]_ is a Python library for detecting anomalies in multivariate
data. The anomaly detectors in PyOD typically deal with tabular data, which assumes
i.i.d (independent and identically distributed) data. This is generally not the
case for time series data, which has a temporal dependency. Nevertheless, the detectors
of PyOD can be used for detecting anomalies in time series data.
Parameters
----------
window_size: int or str
The window size to use for extracting sliding windows from the time series. This
value will be passed to :py:meth:`~dtaianomaly.anomaly_detection.compute_window_size`.
stride: int, default=1
The stride, i.e., the step size for extracting sliding windows from the time series.
**kwargs:
Arguments to be passed to pyod anomaly detector
Attributes
----------
window_size_: int
The effectively used window size for this anomaly detector
pyod_detector_ : SklearnLocalOutlierFactor
The PyOD anomaly detector
References
----------
.. [zhao2019pyod] Zhao, Y., Nasrullah, Z. and Li, Z., 2019. PyOD: A Python Toolbox
for Scalable Outlier Detection. Journal of machine learning research (JMLR), 20(96),
pp.1-7.
"""
window_size: Union[int, str]
stride: int
kwargs: dict
window_size_: int
pyod_detector_: PyODBaseDetector
def __init__(self, window_size: Union[str, int], stride: int = 1, **kwargs):
super().__init__(self._supervision())
check_is_valid_window_size(window_size)
if not isinstance(stride, int) or isinstance(stride, bool):
raise TypeError("`stride` should be an integer")
if stride < 1:
raise ValueError("`stride` should be strictly positive")
self.window_size = window_size
self.stride = stride
self.kwargs = kwargs
# Check if the PyOD detector can be correctly initialized
self._initialize_detector(**self.kwargs)
@abc.abstractmethod
def _initialize_detector(self, **kwargs) -> PyODBaseDetector:
"""
Initialize the PyOD anomaly detector.
Parameters
----------
kwargs:
The hyperparameters to be passed to the PyOD anomaly detector.
Returns
-------
detector: PyODBaseDetector
A PyOD anomaly detector with the given hyperparameters.
"""
@abc.abstractmethod
def _supervision(self) -> Supervision:
"""
Return the supervision of this anomaly detector.
Returns
-------
supervision: Supervision
The supervision of this PyOD anomaly detector.
"""
def _fit(self, X: np.ndarray, y: Optional[np.ndarray] = None, **kwargs) -> None:
self.window_size_ = compute_window_size(X, self.window_size, **kwargs)
self.pyod_detector_ = self._initialize_detector(**self.kwargs)
self.pyod_detector_.fit(sliding_window(X, self.window_size_, self.stride))
def _decision_function(self, X: np.ndarray) -> np.array:
per_window_decision_scores = self.pyod_detector_.decision_function(
sliding_window(X, self.window_size_, self.stride)
)
decision_scores = reverse_sliding_window(
per_window_decision_scores, self.window_size_, self.stride, X.shape[0]
)
return decision_scores