import abc
import enum
import os.path
import pickle
from pathlib import Path
import numpy as np
import scipy
from dtaianomaly.thresholding import ContaminationRateThreshold
from dtaianomaly.type_validation import AttributeValidationMixin, ObjectAttribute
from dtaianomaly.utils import (
CheckIsFittedMixin,
PrintConstructionCallMixin,
is_valid_array_like,
)
__all__ = ["Supervision", "BaseDetector", "load_detector"]
[docs]
class Supervision(enum.Enum):
"""
Supervision types.
An enum for the different supervision types for anomaly detectors.
Valid supervision types are:
- ``Unsupervised``: the anomaly detector does not need any labels or training data.
- ``Semi-supervised``: The anomaly detector requires *normal* training data, but no training labels.
- ``Supervised``: The anomaly detector requires both training data and training labels. The training data may contain anomalies.
"""
UNSUPERVISED = 1
SEMI_SUPERVISED = 2
SUPERVISED = 3
[docs]
class BaseDetector(
PrintConstructionCallMixin, CheckIsFittedMixin, AttributeValidationMixin
):
"""
Abstract base class for time series anomaly detection.
This base class defines method signatures to build
specific anomaly detectors. User-defined detectors
can be used throughout the ``dtaianomaly`` by extending
this base class.
Parameters
----------
supervision : Supervision
The type of supervision this anomaly detector requires.
"""
supervision: Supervision
attribute_validation = {"supervision": ObjectAttribute(Supervision)}
def __init__(self, supervision: Supervision):
self.supervision = supervision
[docs]
def fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> "BaseDetector":
"""
Fit this detector.
Fit this detector to the given data.
Parameters
----------
X : array-like of shape (n_samples, n_attributes)
Input time series.
y : array-like, default=None
Ground-truth information.
**kwargs
Additional parameters to be used to fit the anomaly detector.
Returns
-------
BaseDetector
Returns the instance itself.
"""
# Check the input
if not is_valid_array_like(X):
raise ValueError("Input must be numerical array-like")
# Fit the detector
self._fit(np.asarray(X), y, **kwargs)
# Return self
return self
@abc.abstractmethod
def _fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> None:
"""Effectively fit this detector."""
[docs]
def decision_function(self, X: np.ndarray) -> np.array:
"""
Compute anomaly scores.
Compute the anomaly scores for the given time series using this detector.
Parameters
----------
X : array-like of shape (n_samples, n_attributes)
Input time series.
Returns
-------
array-like of shape (n_samples)
The computed anomaly scores.
"""
# Check input
if not is_valid_array_like(X):
raise ValueError(f"Input must be numerical array-like")
# Check if fitted
self.check_is_fitted()
# Compute decision scores
return self._decision_function(np.asarray(X))
@abc.abstractmethod
def _decision_function(self, X: np.ndarray) -> np.array:
"""Effectively compute the decision function."""
[docs]
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""
Predict anomaly probabilities.
Estimate the probability of a sample of `X` being anomalous,
based on the anomaly scores obtained from `decision_function`
by rescaling them to the range of [0, 1] via min-max scaling.
Parameters
----------
X : array-like of shape (n_samples, n_attributes)
Input time series.
Returns
-------
array-like of shape (n_samples)
1D array with the same length as `X`, with values
in the interval [0, 1], in which a higher value
implies that the instance is more likely to be
anomalous.
Raises
------
ValueError
If `scores` is not a valid array.
ValueError
If the prediction scores from 'decision_function' are constant, but not
in the interval [0, 1], because these values can not unambiguously be
transformed to an anomaly probability.
"""
if not is_valid_array_like(X):
raise ValueError("Input must be numerical array-like")
raw_scores = self.decision_function(X)
min_score = np.nanmin(raw_scores)
max_score = np.nanmax(raw_scores)
if min_score == max_score:
if not (0.0 <= min_score <= 1.0):
raise ValueError(
"The predicted anomaly scores are constant, but not in the interval [0, 1]. "
"It is not clear how to transform these unambiguously to anomaly-probabilities!"
)
return raw_scores
else:
return (raw_scores - min_score) / (max_score - min_score)
[docs]
def predict_confidence(
self,
X: np.ndarray,
X_train: np.ndarray = None,
contamination: float = 0.05,
decision_scores_given: bool = False,
):
"""
Predict the confidence of the anomaly scores on the test given test data :cite:`perini2021quantifying`.
This method implements ExCeeD (Example-wise Confidence
of anomaly Detectors) to estimate the confidence. ExCeed transforms the predicted
decision scores to probability estimates using a Bayesian approach, which enables
to assign a confidence score to each prediction which captures the uncertainty
of the anomaly detector in that prediction.
Parameters
----------
X : array-like of shape (n_samples, n_attributes)
The test time series for which the confidence of anomaly scores
should be predicted.
X_train : array-like of shape (n_samples_train, n_attributes), default=None
The training time series, which can be used as reference. If
``X_train=None``, the test set is used as reference set.
contamination : float, default=0.05
The (estimated) contamination rate for the data, i.e., the expected
percentage of anomalies.
decision_scores_given : bool, default=False
Whether the given ``X`` and ``X_train`` represent time series data
or decision scores. If ``decision_scores_given=False`` (default),
then the given arrays are interpreted as time series. Otherwise,
they are interpreted as decision scores, as computed by
``decision_function()``.
Returns
-------
array-like of shape (n_samples)
The confidence of this anomaly detector in each prediction in the
given test time series.
"""
# Set the decision scores
if decision_scores_given:
if len(X.shape) > 1:
raise ValueError(
"In the 'predict_confidence()' method, it was indicated that the decision scores are provided "
"as X (decision_scores_given=True), but the shape of X does not correspond to the shape of decision"
f"scores: {X.shape}!"
)
if X_train is not None and len(X_train.shape) > 1:
raise ValueError(
"In the 'predict_confidence()' method, it was indicated that the decision scores are provided "
"as X (decision_scores_given=True), but the shape of X_train does not correspond to the shape of decision"
f"scores: {X.shape}!"
)
decision_scores = X
decision_scores_train = X_train if X_train is not None else decision_scores
else:
# Compute the decision scores
decision_scores = self.decision_function(X)
decision_scores_train = (
self.decision_function(X_train)
if X_train is not None
else decision_scores
)
# Convert the decision scores to binary predictions
prediction = ContaminationRateThreshold(
contamination_rate=contamination
).threshold(decision_scores)
# Apply the ExCeed method (https://github.com/Lorenzo-Perini/Confidence_AD/blob/master/ExCeeD.py)
n = decision_scores.shape[0]
count_instances = np.vectorize(
lambda x: np.count_nonzero(decision_scores_train <= x)
)
n_instances = count_instances(decision_scores)
prob_func = np.vectorize(lambda x: (1 + x) / (2 + n))
posterior_prob = prob_func(
n_instances
) # Outlier probability according to ExCeeD
conf_func = np.vectorize(
lambda p: 1 - scipy.stats.binom.cdf(n - int(n * contamination), n, p)
)
exWise_conf = conf_func(posterior_prob)
np.place(
exWise_conf, prediction == 0, 1 - exWise_conf[prediction == 0]
) # if the example is classified as normal, use 1 - confidence.
return exWise_conf
[docs]
def save(self, path: str | Path) -> None:
"""
Save this detector.
Save detector to disk as a pickle file with extension `.dtai`. If the given
path consists of multiple subdirectories, then the not existing subdirectories
are created.
Parameters
----------
path : str or Path
Location where to store the detector.
"""
# Add the '.dtai' extension
if Path(path).suffix != ".dtai":
path = f"{path}.dtai"
# Create the subdirectory, if it doesn't exist
if not os.path.exists(Path(path).parent):
os.makedirs(Path(path).parent)
# Effectively write the anomaly detector to disk
with open(path, "wb") as f:
pickle.dump(self, f)
[docs]
def load_detector(path: str | Path) -> BaseDetector:
"""
Load a detector from disk.
Warning: method relies on pickle. Only load trusted files!
Parameters
----------
path : str or Path
Location of the stored detector.
Returns
-------
BaseDetector
The loaded detector.
"""
with open(path, "rb") as f:
detector = pickle.load(f)
return detector