import warnings
from collections.abc import Iterable
import numba as nb
import numpy as np
from numba.typed import List as nb_List
from numpy.lib.stride_tricks import sliding_window_view
from sklearn.covariance import EmpiricalCovariance
from dtaianomaly import utils
from dtaianomaly.anomaly_detection.BaseDetector import BaseDetector, Supervision
[docs]
class DWT_MLEAD(BaseDetector):
"""
Anomaly detection based on the Discrete Wavelet Transform :cite:`thill2017time`.
DWT-MLEAD (Discrete Wavelet Transform and Maximum Likelihood Estimation
for Anomaly Detection) first performs multilevel DWT
using Haar wavelets. Next, for each window in the obtained coefficients,
a likelihood is estimated using a Guassian distribution. A boundary on
the likelihood is computed within each DWT-level based on the quantiles,
and the likelihood estimates that are below the boundary are flagged as
anomalous. The final anomaly score is then computed as the number of
times an observation was in an anomalous window.
Parameters
----------
start_level: int, default=3
The first level for computing the Discrete Wavelet Transform.
quantile_boundary_type: {'percentile'}, default='percentile'
Method for putting a boundary on the likelihood estimates within each DWT-level.
``'percentile'`` will consider a ``quantile_epsilon`` of the windows as anomalous.
quantile_epsilon: float, default=0.01
The percentile used as threshold on the likelihood estimates.
padding_mode: {'constant', 'edge', 'linear_ramp', 'maximum', 'mean', 'median', 'minimum', 'reflect', 'symmetric', 'wrap', 'empty'}, default='wrap'
Mode for padding the time series, which is passed to `numpy.pad <https://numpy.org/doc/stable/reference/generated/numpy.pad.html>`_.
Examples
--------
>>> from dtaianomaly.anomaly_detection import DWT_MLEAD
>>> from dtaianomaly.data import demonstration_time_series
>>> x, y = demonstration_time_series()
>>> dwt_mlead = DWT_MLEAD() # No fitting is necessary
>>> dwt_mlead.decision_function(x) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
array([ 0., 0., 0., ..., 12., 12., 12.]...)
Notes
-----
The implementation is based on `aeon <https://github.com/aeon-toolkit/aeon/blob/main/aeon/anomaly_detection/distribution_based/_dwt_mlead.py>`_
and `TimeEval <https://github.com/TimeEval/TimeEval-algorithms/blob/main/dwt_mlead/dwt_mlead.py>`_.
These made the following modifications compared to original paper [thill2017time]_:
- We use window sizes for the DWT coefficients that decrease with the level number
because otherwise we would have too few items to slide the window over.
- We exclude the highest level coefficients because they contain only a single entry
and are, thus, not suitable for sliding a window of length 2 over it.
- We have not implemented the Monte Carlo quantile boundary type yet.
- We do not perform the anomaly clustering step to determine the anomaly centers.
Instead, we return the anomaly scores for each timestep in the original time
series.
In addition, we add the following extension:
- aeon uses ``'wrap'`` padding and TimeEval uses ``'periodic'`` padding. Initial
experiments show that different values may lead to quite different anomaly scores.
Therefore, we included the padding as a parameter of DWT-MLEAD.
References
----------
.. [thill2017time] Thill, Markus, Wolfgang Konen, and Thomas Bäck. "Time series anomaly
detection with discrete wavelet transforms and maximum likelihood estimation." International
Conference on Time Series (ITISE). Vol. 2. 2017.
"""
start_level: int
quantile_boundary_type: str
quantile_epsilon: float
padding_mode: str
def __init__(
self,
start_level: int = 3,
quantile_boundary_type: str = "percentile",
quantile_epsilon: float = 0.01,
padding_mode: str = "wrap",
):
super().__init__(Supervision.UNSUPERVISED)
# Check start level
if not isinstance(start_level, int) or isinstance(start_level, bool):
raise TypeError("`start_level` should be numeric")
if start_level < 0:
raise ValueError("`start_level` must be >= 0")
# Check quantile boundary type
if not isinstance(quantile_boundary_type, str):
raise TypeError("'quantile_boundary_type' should be a string")
if quantile_boundary_type not in ["percentile", "monte-carlo"]:
raise ValueError(
"`quantile_boundary_type` must be 'percentile' or 'monte-carlo', "
f"but is {quantile_boundary_type}"
)
if quantile_boundary_type in ["monte-carlo"]:
raise NotImplementedError(
f"The quantile boundary type '{quantile_boundary_type}' "
"is not implemented yet!"
)
# Check quantile epsilon
if not isinstance(quantile_epsilon, (float, int)) or isinstance(
quantile_epsilon, bool
):
raise TypeError("`quantile_epsilon` should be numeric")
if quantile_epsilon < 0 or quantile_epsilon > 1:
raise ValueError("quantile_epsilon must be in [0, 1]")
# Check quantile epsilon
if not isinstance(padding_mode, str):
raise TypeError("'padding_mode' should be a string")
np.pad(np.empty(10), (0, 3), mode=padding_mode)
# Set the parameters
self.start_level = start_level
self.quantile_boundary_type = quantile_boundary_type
self.quantile_epsilon = quantile_epsilon
self.padding_mode = padding_mode
def _fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> None:
"""Should not do anything."""
def _decision_function(self, X: np.ndarray) -> np.array:
# Make sure that X is univariate
if not utils.is_univariate(X):
raise ValueError("Input must be univariate!")
X = X.squeeze().astype(float)
# Format the data
X, n, m = _pad_series(X, self.padding_mode)
max_level = int(np.log2(m))
# Check if the start level is not too big
if self.start_level >= max_level:
raise ValueError(
f"start_level ({self.start_level}) must be less than "
f"log_2(n_timepoints) ({max_level})"
)
# perform multilevel DWT and capture coefficients
levels, approx_coeffs, detail_coeffs = self._multilevel_dwt(X, max_level)
# extract anomalies in each level
window_sizes = np.array(
[
max(2, max_level - level - self.start_level + 1)
for level in range(max_level)
],
dtype=np.int_,
)
coef_anomaly_counts = []
for x, level in zip(
_combine_alternating(detail_coeffs, approx_coeffs), levels.repeat(2, axis=0)
):
w = window_sizes[level]
windows = sliding_window_view(x, w)
p = self._estimate_gaussian_likelihoods(windows)
a = self._mark_anomalous_windows(p)
xa = self._reverse_windowing(a, window_length=w, full_length=x.shape[0])
coef_anomaly_counts.append(xa)
# aggregate anomaly counts (leaf counters)
point_anomaly_scores = self._push_anomaly_counts_down_to_points(
coef_anomaly_counts, m, n
)
return point_anomaly_scores
def _multilevel_dwt(
self, X: np.ndarray, max_level: int
) -> (np.ndarray, list[np.ndarray], list[np.ndarray]):
ls_ = np.arange(self.start_level - 1, max_level - 1, dtype=np.int_) + 1
as_, ds_ = _multilevel_haar_transform(X, max_level - 1)
as_ = as_[self.start_level :]
ds_ = ds_[self.start_level - 1 :]
return ls_, as_, ds_
@staticmethod
def _estimate_gaussian_likelihoods(x_windows: np.ndarray) -> np.ndarray:
with warnings.catch_warnings():
warnings.filterwarnings(action="ignore", category=UserWarning)
# fit gaussion distribution with mean and covariance
estimator = EmpiricalCovariance(assume_centered=False)
estimator.fit(x_windows)
# compute log likelihood for each window x in x_view
n_windows = x_windows.shape[0]
p = np.empty(shape=n_windows)
for i in range(n_windows):
p[i] = estimator.score(x_windows[i].reshape(1, -1))
return p
def _mark_anomalous_windows(self, p: np.ndarray) -> np.ndarray:
if self.quantile_boundary_type == "percentile":
# Surpress the warning
with warnings.catch_warnings():
warnings.filterwarnings(
action="ignore",
category=RuntimeWarning,
message="invalid value encountered in subtract",
)
z_eps = np.percentile(p, self.quantile_epsilon * 100)
else:
raise ValueError(
f"The quantile boundary type '{self.quantile_boundary_type}' "
"is not implemented yet!"
)
return p < z_eps
@staticmethod
def _reverse_windowing(
x: np.ndarray, window_length: int, full_length: int
) -> np.ndarray:
mapped = np.full(shape=(full_length, window_length), fill_value=0)
mapped[: x.shape[0], 0] = x
for w in range(1, window_length):
mapped[:, w] = np.roll(mapped[:, 0], w)
return np.sum(mapped, axis=1)
@staticmethod
def _push_anomaly_counts_down_to_points(
coef_anomaly_counts: list[np.ndarray], m: int, n: int
) -> np.ndarray:
# sum up counters of detail coeffs (orig. D^l) and approx coeffs (orig. C^l)
anomaly_counts = coef_anomaly_counts[0::2] + coef_anomaly_counts[1::2]
# extrapolate anomaly counts to the original series' points
counter = np.zeros(m)
for ac in anomaly_counts:
counter += ac.repeat(m // ac.shape[0], axis=0)
# set event counters with count < 2 to 0
counter[counter < 2] = 0
return counter[:n]
def _pad_series(x: np.ndarray, padding_mode) -> (np.ndarray, int, int):
"""Pad input signal to the next power of 2."""
n = x.shape[0]
exp = np.ceil(np.log2(n))
m = int(np.power(2, exp))
return np.pad(x, (0, m - n), mode=padding_mode), n, m
def _combine_alternating(xs: list, ys: list) -> Iterable:
"""Combine two lists by alternating their elements."""
for x, y in zip(xs, ys):
yield x
yield y
def _multilevel_haar_transform(
x: np.ndarray, levels: int = 1
) -> (list[np.ndarray], list[np.ndarray]):
"""Perform the multilevel discrete Haar wavelet transform on a given signal.
Captures the approximate and detail coefficients per level. The approximate
coefficients contain one more element than the detail coefficients.
Parameters
----------
x : np.ndarray
The input signal.
levels : int
The number of levels to perform the Haar wavelet transform.
Returns
-------
Tuple[List[np.ndarray], List[np.ndarray]]
The approximate and detail coefficients per level.
"""
N = len(x)
max_levels = np.floor(np.log2(N))
if levels > max_levels:
raise ValueError(
f"The level ({levels}) must be less than log_2(N) ({max_levels})."
)
res_approx, res_detail = _haar_transform_iterative(x, levels)
return res_approx, res_detail
@nb.njit(cache=True, fastmath=True)
def _haar_transform_iterative(
x: np.ndarray, levels: int
) -> (nb_List[np.ndarray], nb_List[np.ndarray]):
# initialize
l_approx = nb_List()
l_approx.append(x)
l_detail = nb_List()
for _ in range(1, levels + 1):
approx = l_approx[-1]
l_approx.append(_haar_approx_coefficients(approx))
l_detail.append(_haar_detail_coefficients(approx))
return l_approx, l_detail
@nb.njit(cache=True, fastmath=True)
def _haar_approx_coefficients(arr: np.ndarray) -> np.ndarray:
"""Get the approximate coefficients at a given level."""
if len(arr) == 1:
return np.array([arr[0]])
N = int(np.floor(len(arr) / 2))
new = np.empty(N, dtype=arr.dtype)
for i in range(N):
new[i] = (arr[2 * i] + arr[2 * i + 1]) / np.sqrt(2)
return new
@nb.njit(cache=True, fastmath=True)
def _haar_detail_coefficients(arr: np.ndarray) -> np.ndarray:
"""Get the detail coefficients at a given level."""
if len(arr) == 1:
return np.array([arr[0]])
N = int(np.floor(len(arr) / 2))
new = np.empty(N, dtype=arr.dtype)
for i in range(N):
new[i] = (arr[2 * i] - arr[2 * i + 1]) / np.sqrt(2)
return new