Source code for dtaianomaly.anomaly_detection.MedianMethod
import numpy as np
from typing import Optional
from dtaianomaly import utils
from dtaianomaly.anomaly_detection.BaseDetector import BaseDetector, Supervision
[docs]
class MedianMethod(BaseDetector):
"""
Anomaly detection based on the Two-sided Median Method.
The Median Method [basu2007automatic]_ computes the deviation of a sample
compared to its neighborhood. This neighborhood is computed as a window
around the sample. The deviation is consequently measured as the number
of standard deviations the observations deviates from the mean of its
neighborhood.
In contrast to the original paper, this implementation allows to define a
different neighborhood size before and after the sample, to fine tune how
much lookahead is allowed. In the ultimate case, if ``neighborhood_size_after = 0``,
then the Median Method is a purely online anomaly detector. Note, however,
that this case differs from the One-Sided Median Method discussed in the
original paper, which also uses the first order difference to detect anomalies.
Parameters
----------
neighborhood_size_before: int
The number of observations before the sample to include in the neighborhood.
neighborhood_size_after: int, default=None
The number of observations after the sample to include in the neighborhood.
If None, the same value as ``window_size_before`` will be used.
Examples
--------
>>> from dtaianomaly.anomaly_detection import MedianMethod
>>> from dtaianomaly.data import demonstration_time_series
>>> x, y = demonstration_time_series()
>>> median_method = MedianMethod(10).fit(x)
>>> median_method.decision_function(x)
array([1.1851476 , 0.68191703, 1.05125284, ..., 0.81373386, 1.86097851,
0.05852008])
References
----------
.. [basu2007automatic] Basu, Sabyasachi, and Martin Meckesheimer. "Automatic outlier
detection for time series: an application to sensor data." Knowledge and Information
Systems 11 (2007), 137-154, doi: `10.1007/s10115-006-0026-6 <https://doi.org/10.1007/s10115-006-0026-6>`_.
"""
neighborhood_size_before: int
neighborhood_size_after: Optional[int]
def __init__(self, neighborhood_size_before: int, neighborhood_size_after: Optional[int] = None):
super().__init__(Supervision.UNSUPERVISED)
if not isinstance(neighborhood_size_before, int) or isinstance(neighborhood_size_before, bool):
raise TypeError("`neighborhood_size_before` should be an integer")
if neighborhood_size_before < 1:
raise ValueError("`neighborhood_size_before` should be strictly positive")
if neighborhood_size_after is not None:
if not isinstance(neighborhood_size_after, int) or isinstance(neighborhood_size_after, bool):
raise TypeError("`neighborhood_size_after` should be an integer")
if neighborhood_size_after < 0:
raise ValueError("`neighborhood_size_after` can not be negative!")
self.neighborhood_size_before = neighborhood_size_before
self.neighborhood_size_after = neighborhood_size_after
[docs]
def fit(self, X: np.ndarray, y: Optional[np.ndarray] = None) -> 'BaseDetector':
return self # No fitting is required.
[docs]
def decision_function(self, X: np.ndarray) -> np.ndarray:
if not utils.is_valid_array_like(X):
raise ValueError("Input must be numerical array-like")
if not utils.is_univariate(X):
raise ValueError("Input must be univariate!")
# Convert to a numpy array
X = np.asarray(X, dtype=float).squeeze()
# Set the neighborhood size after the observation
if self.neighborhood_size_after is None:
neighborhood_size_after = self.neighborhood_size_before
else:
neighborhood_size_after = self.neighborhood_size_after
X_padded = np.pad(X, (self.neighborhood_size_before, neighborhood_size_after), constant_values=(np.nan,))
neighborhoods = np.lib.stride_tricks.sliding_window_view(X_padded, window_shape=(self.neighborhood_size_before + neighborhood_size_after + 1))
return np.nan_to_num(np.abs(X - np.nanmean(neighborhoods, axis=1)) / np.nanstd(neighborhoods, axis=1), nan=0.0)