Source code for dtaianomaly.anomaly_detection._LocalPolynomialApproximation

# Based on: https://gist.github.com/kadereub/9eae9cff356bb62cdbd672931e8e5ec4
import numba as nb
import numpy as np

from dtaianomaly import utils
from dtaianomaly.anomaly_detection._BaseDetector import BaseDetector, Supervision
from dtaianomaly.type_validation import BoolAttribute, IntegerAttribute

__all__ = ["LocalPolynomialApproximation"]


[docs] class LocalPolynomialApproximation(BaseDetector): """ Perform anomaly detection based on local polynomial approximations :cite:`li2007unifying`. The local polynomial approximation learns a forward estimate :math:`\\hat{x}_t^{(f)} = f(x_{t-1})` and a backward estimate :math:`\\hat{x}_t^{(b)} = b(x_{t+1})` for each observation in the time series. Both :math:`f` and :math:`b` are polynomials of a specified degree, and are estimated using a neighborhood before and after :math:`x_t`, respectively. The error of both estimators is computed, and the maximum error is taken as anomaly score, under the assumption anomalies have both a large forward and backward error. Parameters ---------- neighborhood : int The size of the neighborhood for estimating the polynomials. The window size, the length of the subsequences that will be detected as anomalies. This value will be passed to :py:meth:`~dtaianomaly.anomaly_detection.compute_window_size`. power : int, default=1 The power of the polynomial to fit. While any strictly positive value is valid, :cite:`li2007unifying` indicate that the local polynomial fit with odd order is better than that with an even order. normalize_variance : bool, default=false Wether to normalize the forward and backward estimates based on the variance of the forward and backward neighborhood, respectively. buffer : int, default=16 A buffer at the start and end of the time series, used to ensure that sufficient data is available for fitting each polynomial. The buffer must be at least 3 to ensure that there is at least a single data point at the beginning and ending of the time series. Notes ----- - LocalPolynomialApproximation only handles univariate time series. - The original version of :cite:t:`li2007unifying` normalizes the forward and backward scores. Their approach requires two additional parameters. Therefore, we did not implement this and leave normalizatin of the (aggregate) scores to post-processing. Examples -------- >>> from dtaianomaly.anomaly_detection import LocalPolynomialApproximation >>> from dtaianomaly.data import demonstration_time_series >>> x, y = demonstration_time_series() >>> poly = LocalPolynomialApproximation(neighborhood=50).fit(x) >>> poly.decision_function(x) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE array([0., 0., 0., ..., 0., 0., 0.]...) """ neighborhood: int power: int normalize_variance: bool buffer: int attribute_validation = { "neighborhood": IntegerAttribute(2), "power": IntegerAttribute(1), "normalize_variance": BoolAttribute(), "buffer": IntegerAttribute(3), } def __init__( self, neighborhood: int, power: int = 1, normalize_variance: bool = False, buffer: int = 16, ): super().__init__(Supervision.UNSUPERVISED) self.neighborhood = neighborhood self.power = power self.normalize_variance = normalize_variance self.buffer = buffer def _fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> None: if not utils.is_univariate(X): raise ValueError( "'LocalPolynomialApproximation' must receive univariate data!" ) def _decision_function(self, X: np.ndarray) -> np.array: if not utils.is_univariate(X): raise ValueError( "'LocalPolynomialApproximation' must receive univariate data!" ) return _local_poly_nb_parallel( X=X.squeeze(), neighborhood=self.neighborhood, power=self.power, buffer=self.buffer, normalize_variance=self.normalize_variance, )
@nb.njit(fastmath=True) def _coeff_mat(x, deg): mat_ = np.zeros(shape=(x.shape[0], deg + 1)) const = np.ones_like(x) mat_[:, 0] = const mat_[:, 1] = x if deg > 1: for n in range(2, deg + 1): mat_[:, n] = x**n return mat_ @nb.njit(fastmath=True) def _fit_x(a, b): return np.linalg.lstsq(a, b)[0] @nb.njit(fastmath=True) def fit_poly(x, y, deg): a = _coeff_mat(x, deg) p = _fit_x(a, y) # Reverse order so p[0] is coefficient of highest order return p[::-1] @nb.njit(fastmath=True) def eval_polynomial(P, x): """ Compute polynomial P(x) where P is a vector of coefficients, highest order coefficient at P[0]. Uses Horner's Method. """ result = 0 for coeff in P: result = x * result + coeff return result @nb.njit(parallel=True) def _local_poly_nb_parallel( X, neighborhood=64, power=1, buffer=16, normalize_variance=False ): decision_scores = np.zeros(shape=(X.shape[0])) for t in nb.prange(buffer + 1, X.shape[0] - buffer): # Fit the forward estimator forward_neighborhood = X[max(0, t - neighborhood) : t] forward_polynomial = fit_poly( forward_neighborhood[:-1], forward_neighborhood[1:], power ) # Fit the backward estimator backward_neighborhood = X[t + 1 : t + neighborhood + 1] backward_polynomial = fit_poly( backward_neighborhood[1:], backward_neighborhood[:-1], power ) # Compute the scores forward_score = (X[t] - eval_polynomial(forward_polynomial, X[t - 1])) ** 2 backward_score = (X[t] - eval_polynomial(backward_polynomial, X[t + 1])) ** 2 # Normalize the variance, if requested if normalize_variance: forward_var = np.var(forward_neighborhood) if forward_var != 0: forward_score /= forward_var**2 backward_var = np.var(backward_neighborhood) if backward_var != 0: backward_score /= backward_var**2 decision_scores[t] = max(forward_score, backward_score) return decision_scores