Source code for dtaianomaly.evaluation._range_based_metrics

import abc
from typing import Literal

import numpy as np

from dtaianomaly.evaluation._BinaryMetric import BinaryMetric
from dtaianomaly.evaluation._FBetaMixin import FBetaMixin
from dtaianomaly.type_validation import FloatAttribute, LiteralAttribute
from dtaianomaly.utils import make_intervals

__all__ = ["RangeBasedPrecision", "RangeBasedFBeta", "RangeBasedRecall"]

_IntervalType = tuple[int, int]
_DeltaType = Literal["flat", "front", "back", "middle"]
_GammaType = Literal["one", "reciprocal"]


def _interval_overlap(a: _IntervalType, b: _IntervalType) -> _IntervalType | None:
    start = max(a[0], b[0])
    end = min(a[1], b[1])
    return (start, end) if start < end else None


def _omega(
    anomaly_range: _IntervalType,
    overlap_set: _IntervalType | None,
    delta: _DeltaType,
) -> float:
    # Figure 2.a
    if overlap_set is None:
        return 0
    my_value = 0
    max_value = 0
    anomaly_length = anomaly_range[1] - anomaly_range[0]
    for i in range(1, anomaly_length + 1):
        bias = _delta(delta, i, anomaly_length)
        max_value += bias
        if overlap_set[0] <= anomaly_range[0] + i - 1 < overlap_set[1]:
            my_value += bias
    return my_value / max_value


def _delta(delta: _DeltaType, i: int, anomaly_length: int) -> float:
    # Figure 2.b
    if delta == "flat":
        return 1
    elif delta == "front":
        return anomaly_length - i + 1
    elif delta == "back":
        return i
    elif delta == "middle":
        return i if i <= anomaly_length / 2 else anomaly_length - i + 1
    else:  # Custom method
        raise ValueError(f"Invalid value for delta given: '{delta}")


def _gamma(gamma: _GammaType, nb_overlapping_intervals: int) -> float:
    if gamma == "one":
        return 1
    elif gamma == "reciprocal":
        return 1 / nb_overlapping_intervals
    else:  # Custom method
        raise ValueError(f"Invalid value for gamma given: '{gamma}")


def _existence_reward(
    interval: _IntervalType, other_intervals: list[_IntervalType]
) -> float:
    # Equation (5)
    for other_interval in other_intervals:
        if _interval_overlap(interval, other_interval) is not None:
            return 1
    return 0


def _overlap_reward(
    interval: _IntervalType,
    other_intervals: list[_IntervalType],
    delta: _DeltaType,
    gamma: _GammaType,
) -> float:
    # Equation (6)
    return _cardinality_factor(interval, other_intervals, gamma) * sum(
        [
            _omega(interval, _interval_overlap(interval, other_interval), delta)
            for other_interval in other_intervals
        ]
    )


def _cardinality_factor(
    interval: _IntervalType, other_intervals: list[_IntervalType], gamma: _GammaType
) -> float:
    # Equation (7)
    nb_overlapping_intervals = 0
    for other_interval in other_intervals:
        if _interval_overlap(interval, other_interval) is not None:
            nb_overlapping_intervals += 1
    return (
        1 if nb_overlapping_intervals <= 1 else _gamma(gamma, nb_overlapping_intervals)
    )


def _precision_interval(
    interval: _IntervalType,
    ground_truth_intervals: list[_IntervalType],
    delta: _DeltaType,
    gamma: _GammaType,
) -> float:
    # Equation (9)
    return _overlap_reward(interval, ground_truth_intervals, delta, gamma)


def _recall_interval(
    interval: _IntervalType,
    predicted_intervals: list[_IntervalType],
    alpha: float,
    delta: _DeltaType,
    gamma: _GammaType,
) -> float:
    # Equation (4)
    return alpha * _existence_reward(interval, predicted_intervals) + (
        1 - alpha
    ) * _overlap_reward(interval, predicted_intervals, delta, gamma)


class RangeBasedMetricBasePrecision(BinaryMetric, abc.ABC):
    delta: _DeltaType
    gamma: _GammaType
    attribute_validation = {
        "delta": LiteralAttribute("flat", "front", "back", "middle"),
        "gamma": LiteralAttribute("one", "reciprocal"),
    }

    def __init__(self, delta: _DeltaType = "flat", gamma: _GammaType = "reciprocal"):
        self.delta = delta
        self.gamma = gamma

    def _precision(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        # Add 1 to ends, because make_intervals returns closed intervals while the code here assumes half-open intervals
        gt_starts, gt_ends = make_intervals(y_true)
        pred_starts, pred_ends = make_intervals(y_pred)

        ground_truth_intervals = list(zip(gt_starts, gt_ends + 1))
        precision_T = [
            _precision_interval(
                interval, ground_truth_intervals, self.delta, self.gamma
            )
            for interval in zip(pred_starts, pred_ends + 1)
        ]

        return sum(precision_T) / pred_starts.shape[0]


class RangeBasedMetricBasePrecisionRecall(RangeBasedMetricBasePrecision, abc.ABC):
    alpha: float

    attribute_validation = {
        "alpha": FloatAttribute(0.0, 1.0),
    }

    def __init__(
        self,
        alpha: float = 0.5,
        delta: _DeltaType = "flat",
        gamma: _GammaType = "reciprocal",
    ):
        super().__init__(delta, gamma)
        self.alpha = alpha

    def _recall(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        # Add 1 to ends, because make_intervals returns closed intervals while the code here assumes half-open intervals
        gt_starts, gt_ends = make_intervals(y_true)
        pred_starts, pred_ends = make_intervals(y_pred)

        predicted_intervals = list(zip(pred_starts, pred_ends + 1))
        recall_T = [
            _recall_interval(
                interval, predicted_intervals, self.alpha, self.delta, self.gamma
            )
            for interval in zip(gt_starts, gt_ends + 1)
        ]

        return sum(recall_T) / gt_starts.shape[0]


[docs] class RangeBasedPrecision(RangeBasedMetricBasePrecision): """ Computes the range-based precision score :cite:`tatbul2018precision`. The range-based precision computes a precision-score for each predicted anomalous range and then takes the average over all ranges. This precision-score consists of two parts: (1) the amount of overlap between the predicted range and the ground truth ranges, and (2) whether the predicted range overlaps with only one or multiple ground truth ranges. These components can be computed independently, and are multiplied to get a final precision-score for the range. Parameters ---------- delta : str, default='flat' Bias for the position of the predicted anomaly in the ground truth anomalous range. Valid options are: - ``'flat'``: Equal bias towards all positions in the ground truth anomalous range. - ``'front'``: Predictions that are near the front of the ground truth anomaly (i.e. early detection) have a higher weight. - ``'back'``: Predictions that are near the end of the ground truth anomaly (i.e. late detection) have a higher weight. - ``'middle'``: Predictions that are near the center of the ground truth anomaly have a higher weight. gamma : str, default='reciprocal' Penalization approach for detecting multiple ranges with a single range. Valid options are: - ``'one'``: Fragmented detection should not be penalized. - ``'reciprocal'``: Weight fragmented detection of :math:´N´ ranges with as single range by a factor of :math:´1/N´. Warnings -------- Note that, while tuning a metric to some domain is beneficial in practical applications, this flexibility makes it difficult for a large-scale, general-purpose evaluation of multiple anomaly detectors, as you can optimize the metric for a specific application. See Also -------- RangeBasedPrecision: Compute the range-based precision score. RangeBasedRecall: Compute the range-based recall score. Examples -------- >>> from dtaianomaly.evaluation import RangeBasedPrecision >>> metric = RangeBasedPrecision() >>> y_true = [0, 0, 0, 1, 1, 0, 0, 0] >>> y_pred = [1, 0, 0, 1, 1, 1, 0, 0] >>> metric.compute(y_true, y_pred) # doctest: +ELLIPSIS 0.333... """ def _compute(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> float: return self._precision(y_true, y_pred)
[docs] class RangeBasedRecall(RangeBasedMetricBasePrecisionRecall): """ Computes the range-based recall score :cite:`tatbul2018precision`. The range-based recall computes a recall-score for each ground truth anomalous range and then takes the average over all ranges. This recall-score consists of three parts: (1) the amount of overlap between the ground truth range and the predicted ranges, (2) whether the ground truth range overlaps with only one or multiple predicted ranges, and (3) whether the final ground truth range is detected at all. Components (1) and (2) are computed independently and multiplied, of which the result is combined with component (3) through a convex combination to get a final recall-score for the ground truth range. Parameters ---------- alpha : float, default=0.5 The importance of detecting the events (even if it is only a single detected point) compared to detecting a large portion of the ground truth events. Should be at least 0 and at most 1. delta : str, default='flat' Bias for the position of the predicted anomaly in the ground truth anomalous range. Valid options are: - ``'flat'``: Equal bias towards all positions in the ground truth anomalous range. - ``'front'``: Predictions that are near the front of the ground truth anomaly (i.e. early detection) have a higher weight. - ``'back'``: Predictions that are near the end of the ground truth anomaly (i.e. late detection) have a higher weight. - ``'middle'``: Predictions that are near the center of the ground truth anomaly have a higher weight. gamma : str, default='reciprocal' Penalization approach for detecting multiple ranges with a single range. Valid options are: - ``'one'``: Fragmented detection should not be penalized. - ``'reciprocal'``: Weight fragmented detection of :math:´N´ ranges with as single range by a factor of :math:´1/N´. Warnings -------- Note that, while tuning a metric to some domain is beneficial in practical applications, this flexibility makes it difficult for a large-scale, general-purpose evaluation of multiple anomaly detectors, as you can optimize the metric for a specific application See Also -------- RangeBasedPrecision: Compute the range-based precision score. RangeBasedFBeta: Compute the range-based :math:`F_\\beta` score. Examples -------- >>> from dtaianomaly.evaluation import RangeBasedRecall >>> metric = RangeBasedRecall() >>> y_true = [0, 0, 0, 1, 1, 0, 0, 0] >>> y_pred = [1, 0, 0, 1, 1, 1, 0, 0] >>> metric.compute(y_true, y_pred) 1.0 """ def _compute(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> float: return self._recall(y_true, y_pred)
[docs] class RangeBasedFBeta(RangeBasedMetricBasePrecisionRecall, FBetaMixin): """ Computes the range-based :math:`F_\\beta` score :cite:`tatbul2018precision`. The range-based :math:`F_\\beta`-score equals the harmonic mean of the range-based precision and range-based recall. The metrics take into account three parts: (1) the amount of overlap between the ground truth ranges and the predicted ranges, (2) whether there is fragmented detection or not, and (3) whether the ground truth ranges are detected at all. Parameters ---------- beta : int, float, default=1 Desired beta parameter. alpha : float, default=0.5 The importance of detecting the events (even if it is only a single detected point) compared to detecting a large portion of the ground truth events. Should be at least 0 and at most 1. delta : str, default='flat' Bias for the position of the predicted anomaly in the ground truth anomalous range. Valid options are: - ``'flat'``: Equal bias towards all positions in the ground truth anomalous range. - ``'front'``: Predictions that are near the front of the ground truth anomaly (i.e. early detection) have a higher weight. - ``'back'``: Predictions that are near the end of the ground truth anomaly (i.e. late detection) have a higher weight. - ``'middle'``: Predictions that are near the center of the ground truth anomaly have a higher weight. gamma : str, default='reciprocal' Penalization approach for detecting multiple ranges with a single range. Valid options are: - ``'one'``: Fragmented detection should not be penalized. - ``'reciprocal'``: Weight fragmented detection of :math:´N´ ranges with as single range by a factor of :math:´1/N´. Warnings -------- Note that, while tuning a metric to some domain is beneficial in practical applications, this flexibility makes it difficult for a large-scale, general-purpose evaluation of multiple anomaly detectors, as you can optimize the metric for a specific application. See Also -------- RangeBasedPrecision: Compute the range-based precision score. RangeBasedRecall: Compute the range-based recall score. Examples -------- >>> from dtaianomaly.evaluation import RangeBasedFBeta >>> metric = RangeBasedFBeta() >>> y_true = [0, 0, 0, 1, 1, 0, 0, 0] >>> y_pred = [1, 0, 0, 1, 1, 1, 0, 0] >>> metric.compute(y_true, y_pred) 0.5 """ def __init__( self, beta: int | float = 1.0, alpha: float = 0.5, delta: _DeltaType = "flat", gamma: _GammaType = "reciprocal", ): RangeBasedMetricBasePrecisionRecall.__init__(self, alpha, delta, gamma) FBetaMixin.__init__(self, beta) def _compute(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> float: return self._f_score( precision=self._precision(y_true, y_pred), recall=self._recall(y_true, y_pred), )