import abc
from collections.abc import Callable
import numpy as np
from dtaianomaly.evaluation._common import FBetaBase, make_intervals
from dtaianomaly.evaluation.metrics import BinaryMetric
_IntervalType = tuple[int, int]
_DeltaType = str | Callable[[int, int], float]
_GammaType = str | Callable[[int], float]
def _interval_overlap(a: _IntervalType, b: _IntervalType) -> _IntervalType | None:
start = max(a[0], b[0])
end = min(a[1], b[1])
return (start, end) if start < end else None
def _omega(
anomaly_range: _IntervalType,
overlap_set: _IntervalType | None,
delta: _DeltaType,
) -> float:
# Figure 2.a
if overlap_set is None:
return 0
my_value = 0
max_value = 0
anomaly_length = anomaly_range[1] - anomaly_range[0]
for i in range(1, anomaly_length + 1):
bias = _delta(delta, i, anomaly_length)
max_value += bias
if overlap_set[0] <= anomaly_range[0] + i - 1 < overlap_set[1]:
my_value += bias
return my_value / max_value
def _delta(delta: _DeltaType, i: int, anomaly_length: int) -> float:
# Figure 2.b
if delta == "flat":
return 1
elif delta == "front":
return anomaly_length - i + 1
elif delta == "back":
return i
elif delta == "middle":
return i if i <= anomaly_length / 2 else anomaly_length - i + 1
else: # Custom method
return delta(i, anomaly_length)
def _gamma(gamma: _GammaType, nb_overlapping_intervals: int) -> float:
if gamma == "one":
return 1
elif gamma == "reciprocal":
return 1 / nb_overlapping_intervals
else: # Custom method
return gamma(nb_overlapping_intervals)
def _existence_reward(
interval: _IntervalType, other_intervals: list[_IntervalType]
) -> float:
# Equation (5)
for other_interval in other_intervals:
if _interval_overlap(interval, other_interval) is not None:
return 1
return 0
def _overlap_reward(
interval: _IntervalType,
other_intervals: list[_IntervalType],
delta: _DeltaType,
gamma: _GammaType,
) -> float:
# Equation (6)
return _cardinality_factor(interval, other_intervals, gamma) * sum(
[
_omega(interval, _interval_overlap(interval, other_interval), delta)
for other_interval in other_intervals
]
)
def _cardinality_factor(
interval: _IntervalType, other_intervals: list[_IntervalType], gamma: _GammaType
) -> float:
# Equation (7)
nb_overlapping_intervals = 0
for other_interval in other_intervals:
if _interval_overlap(interval, other_interval) is not None:
nb_overlapping_intervals += 1
return (
1 if nb_overlapping_intervals <= 1 else _gamma(gamma, nb_overlapping_intervals)
)
def _precision_interval(
interval: _IntervalType,
ground_truth_intervals: list[_IntervalType],
delta: _DeltaType,
gamma: _GammaType,
) -> float:
# Equation (9)
return _overlap_reward(interval, ground_truth_intervals, delta, gamma)
def _recall_interval(
interval: _IntervalType,
predicted_intervals: list[_IntervalType],
alpha: float,
delta: _DeltaType,
gamma: _GammaType,
) -> float:
# Equation (4)
return alpha * _existence_reward(interval, predicted_intervals) + (
1 - alpha
) * _overlap_reward(interval, predicted_intervals, delta, gamma)
class RangeBasedMetricBasePrecision(BinaryMetric, abc.ABC):
delta: _DeltaType
gamma: _GammaType
def __init__(self, delta: _DeltaType = "flat", gamma: _GammaType = "reciprocal"):
if isinstance(delta, str):
if delta not in ["flat", "front", "back", "middle"]:
raise ValueError(
f"Only predefined `delta` values are ['flat', 'front', 'back', 'middle'], received: '{delta}'"
)
elif callable(delta):
try:
delta(0, 10)
except TypeError:
raise TypeError(
"If 'delta' is a custom method, it should be of the form '(int, int) -> float'"
)
else:
raise TypeError(f"`delta` should be a string or a callable")
if isinstance(gamma, str):
if gamma not in ["one", "reciprocal"]:
raise ValueError(
f"Only predefined `gamma` values are ['one', 'reciprocal'], received: '{gamma}'"
)
elif callable(gamma):
try:
gamma(2)
except TypeError:
raise TypeError(
"If 'gamma' is a custom method, it should be of the form 'int -> float'"
)
else:
raise TypeError(f"`gamma` should be a string or a callable")
self.delta = delta
self.gamma = gamma
def _precision(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
# Add 1 to ends, because make_intervals returns closed intervals while the code here assumes half-open intervals
gt_starts, gt_ends = make_intervals(y_true)
pred_starts, pred_ends = make_intervals(y_pred)
ground_truth_intervals = list(zip(gt_starts, gt_ends + 1))
precision_T = [
_precision_interval(
interval, ground_truth_intervals, self.delta, self.gamma
)
for interval in zip(pred_starts, pred_ends + 1)
]
return sum(precision_T) / pred_starts.shape[0]
class RangeBasedMetricBasePrecisionRecall(RangeBasedMetricBasePrecision, abc.ABC):
alpha: float
def __init__(
self,
alpha: float = 0.5,
delta: _DeltaType = "flat",
gamma: _GammaType = "reciprocal",
):
super().__init__(delta, gamma)
if not isinstance(alpha, (float, int)) or isinstance(alpha, bool):
raise TypeError("`alpha` should be numeric")
if not (0.0 <= alpha <= 1.0):
raise ValueError("`alpha` should be at least 0 and at most 1")
self.alpha = alpha
def _recall(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
# Add 1 to ends, because make_intervals returns closed intervals while the code here assumes half-open intervals
gt_starts, gt_ends = make_intervals(y_true)
pred_starts, pred_ends = make_intervals(y_pred)
predicted_intervals = list(zip(pred_starts, pred_ends + 1))
recall_T = [
_recall_interval(
interval, predicted_intervals, self.alpha, self.delta, self.gamma
)
for interval in zip(gt_starts, gt_ends + 1)
]
return sum(recall_T) / gt_starts.shape[0]
[docs]
class RangeBasedPrecision(RangeBasedMetricBasePrecision):
"""
Computes the range-based precision score :cite:`tatbul2018precision`.
The range-based precision computes a precision-score for each predicted
anomalous range and then takes the average over all ranges. This precision-score
consists of two parts: (1) the amount of overlap between the predicted range
and the ground truth ranges, and (2) whether the predicted range overlaps with
only one or multiple ground truth ranges. These components can be computed
independently, and are multiplied to get a final precision-score for the range.
Parameters
----------
delta: str or callable, default='flat'
Bias for the position of the predicted anomaly in the ground truth anomalous
range. Valid options are:
- ``'flat'``: Equal bias towards all positions in the ground truth anomalous range.
- ``'front'``: Predictions that are near the front of the ground truth anomaly (i.e. early detection) have a higher weight.
- ``'back'``: Predictions that are near the end of the ground truth anomaly (i.e. late detection) have a higher weight.
- ``'middle'``: Predictions that are near the center of the ground truth anomaly have a higher weight.
- Callable: A custom function to include positional bias, which takes as input two integers (a position within the anomalous range, and the total length of that range) and returns a float (the weight of that position).
gamma: str or callable, default='reciprocal'
Penalization approach for detecting multiple ranges with a single range. Valid options are:
- ``'one'``: Fragmented detection should not be penalized.
- ``'reciprocal'``: Weight fragmented detection of :math:´N´ ranges with as single range by a factor of :math:´1/N´.
- Callable: A custom function to penalize fragmented detection, which takes as input an integer (the number of detected ranges) and returns a float (the penalization factor).
"""
def _compute(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> float:
return self._precision(y_true, y_pred)
[docs]
class RangeBasedRecall(RangeBasedMetricBasePrecisionRecall):
"""
Computes the range-based recall score :cite:`tatbul2018precision`.
The range-based recall computes a recall-score for each ground truth
anomalous range and then takes the average over all ranges. This recall-score
consists of three parts: (1) the amount of overlap between the ground truth range
and the predicted ranges, (2) whether the ground truth range overlaps with
only one or multiple predicted ranges, and (3) whether the final ground truth
range is detected at all. Components (1) and (2) are computed independently
and multiplied, of which the result is combined with component (3) through
a convex combination to get a final recall-score for the ground truth range.
Parameters
----------
alpha: float, default=0.5
The importance of detecting the events (even if it is only a single detected point)
compared to detecting a large portion of the ground truth events. Should be at least 0
and at most 1.
delta: str or callable, default='flat'
Bias for the position of the predicted anomaly in the ground truth anomalous
range. Valid options are:
- ``'flat'``: Equal bias towards all positions in the ground truth anomalous range.
- ``'front'``: Predictions that are near the front of the ground truth anomaly (i.e. early detection) have a higher weight.
- ``'back'``: Predictions that are near the end of the ground truth anomaly (i.e. late detection) have a higher weight.
- ``'middle'``: Predictions that are near the center of the ground truth anomaly have a higher weight.
- Callable: A custom function to include positional bias, which takes as input two integers (a position within the anomalous range, and the total length of that range) and returns a float (the weight of that position).
gamma: str or callable, default='reciprocal'
Penalization approach for detecting multiple ranges with a single range. Valid options are:
- ``'one'``: Fragmented detection should not be penalized.
- ``'reciprocal'``: Weight fragmented detection of :math:´N´ ranges with as single range by a factor of :math:´1/N´.
- Callable: A custom function to penalize fragmented detection, which takes as input an integer (the number of detected ranges) and returns a float (the penalization factor).
"""
def _compute(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> float:
return self._recall(y_true, y_pred)
[docs]
class RangeBasedFBeta(RangeBasedMetricBasePrecisionRecall, FBetaBase):
"""
Computes the range-based :math:`F_\\beta` score :cite:`tatbul2018precision`.
The range-based :math:`F_\\beta`-score equals the harmonic mean of the range-based
precision and range-based recall. The metrics take into account three parts: (1) the
amount of overlap between the ground truth ranges and the predicted ranges, (2) whether
there is fragmented detection or not, and (3) whether the ground truth ranges are
detected at all.
Parameters
----------
beta: int, float, default=1
Desired beta parameter.
alpha: float, default=0.5
The importance of detecting the events (even if it is only a single detected point)
compared to detecting a large portion of the ground truth events. Should be at least 0
and at most 1.
delta: str or callable, default='flat'
Bias for the position of the predicted anomaly in the ground truth anomalous
range. Valid options are:
- ``'flat'``: Equal bias towards all positions in the ground truth anomalous range.
- ``'front'``: Predictions that are near the front of the ground truth anomaly (i.e. early detection) have a higher weight.
- ``'back'``: Predictions that are near the end of the ground truth anomaly (i.e. late detection) have a higher weight.
- ``'middle'``: Predictions that are near the center of the ground truth anomaly have a higher weight.
- Callable: A custom function to include positional bias, which takes as input two integers (a position within the anomalous range, and the total length of that range) and returns a float (the weight of that position).
gamma: str or callable, default='reciprocal'
Penalization approach for detecting multiple ranges with a single range. Valid options are:
- ``'one'``: Fragmented detection should not be penalized.
- ``'reciprocal'``: Weight fragmented detection of :math:´N´ ranges with as single range by a factor of :math:´1/N´.
- Callable: A custom function to penalize fragmented detection, which takes as input an integer (the number of detected ranges) and returns a float (the penalization factor).
See also
--------
RangeBasedPrecision: Compute the range-based precision score.
RangeBasedRecall: Compute the range-based recall score.
"""
def __init__(
self,
beta: (float, int) = 1.0,
alpha: float = 0.5,
delta: _DeltaType = "flat",
gamma: _GammaType = "reciprocal",
):
RangeBasedMetricBasePrecisionRecall.__init__(self, alpha, delta, gamma)
FBetaBase.__init__(self, beta)
def _compute(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> float:
return self._f_score(
precision=self._precision(y_true, y_pred),
recall=self._recall(y_true, y_pred),
)