Source code for dtaianomaly.evaluation.affiliation_metrics

import math
from itertools import groupby
from operator import itemgetter

import numpy as np

from dtaianomaly.evaluation._common import FBetaBase
from dtaianomaly.evaluation.metrics import BinaryMetric

###############################################################################
# AFFILIATION METRICS #########################################################
###############################################################################


def _convert_vector_to_events(vector):
    """
    Convert a binary vector (indicating 1 for the anomalous instances)
    to a list of events. The events are considered as durations,
    i.e. setting 1 at index i corresponds to an anomalous interval [i, i+1).

    :param vector: a list of elements belonging to {0, 1}
    :return: a list of couples, each couple representing the start and stop of
    each event
    """
    positive_indexes = [idx for idx, val in enumerate(vector) if val > 0]
    events = []
    for k, g in groupby(enumerate(positive_indexes), lambda ix: ix[0] - ix[1]):
        cur_cut = list(map(itemgetter(1), g))
        events.append((cur_cut[0], cur_cut[-1]))

    # Consistent conversion in case of range anomalies (for indexes):
    # A positive index i is considered as the interval [i, i+1),
    # so the last index should be moved by 1
    events = [(x, y + 1) for (x, y) in events]

    return events


def _test_events(events):
    """
    Verify the validity of the input events
    :param events: list of events, each represented by a couple (start, stop)
    :return: None. Raise an error for incorrect formed or non ordered events
    """
    if type(events) is not list:
        raise TypeError("Input `events` should be a list of couples")
    if not all([type(x) is tuple for x in events]):
        raise TypeError("Input `events` should be a list of tuples")
    if not all([len(x) == 2 for x in events]):
        raise ValueError("Input `events` should be a list of couples (start, stop)")
    if not all([x[0] < x[1] for x in events]):
        raise ValueError(
            "Input `events` should be a list of couples (start, stop) with start < stop"
        )
    if not all([events[i][1] < events[i + 1][0] for i in range(len(events) - 1)]):
        raise ValueError("Couples of input `events` should be disjoint and ordered")


def _compute_affiliation_metrics(
    y_true: np.ndarray, y_pred: np.ndarray
) -> (float, float):
    events_gt = _convert_vector_to_events(y_true)
    _test_events(events_gt)

    events_pred = _convert_vector_to_events(y_pred)
    _test_events(events_pred)

    if len(events_gt) == 0:
        raise ValueError("Input `events_gt` should have at least one event")

    E_gt = _get_all_E_gt_func(events_gt, (0, y_true.shape[0]))
    aff_partition = _affiliation_partition(events_pred, E_gt)

    # Computing precision
    p_precision = [
        _affiliation_precision_proba(Is, J, E)
        for Is, J, E in zip(aff_partition, events_gt, E_gt)
    ]

    # Computing recall
    p_recall = [
        _affiliation_recall_proba(Is, J, E)
        for Is, J, E in zip(aff_partition, events_gt, E_gt)
    ]

    if _len_wo_nan(p_precision) > 0:
        p_precision_average = _sum_wo_nan(p_precision) / _len_wo_nan(p_precision)
    else:
        p_precision_average = p_precision[0]  # math.nan
    p_recall_average = sum(p_recall) / len(p_recall)

    return p_precision_average, p_recall_average


[docs] class AffiliationPrecision(BinaryMetric): """ Computes the affiliation-based precision score :cite:`huet2022local`. The affiliation-metrics will first divide the time domain into a number of so-called affiliations: subsequences that are closest to the ground truth anomaly events. These affiliations do not have a fixed size. Then, the precision is computed within each affiliation as the distance from the predicted anomalous events to the ground truth event. The final precision then equals the average precision across all the affiliations. """ def _compute(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> float: affiliation_precision, _ = _compute_affiliation_metrics(y_true, y_pred) return affiliation_precision
[docs] class AffiliationRecall(BinaryMetric): """ Computes the affiliation-based recall score :cite:`huet2022local`. The affiliation-metrics will first divide the time domain into a number of so-called affiliations: subsequences that are closest to the ground truth anomaly events. These affiliations do not have a fixed size. Then, the recall is computed within each affiliation as the distance from the ground truth anomalous event to the closest predicted anomalies in that affiliation. The final recall then equals the average recall across all the affiliations. """ def _compute(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> float: _, affiliation_recall = _compute_affiliation_metrics(y_true, y_pred) return affiliation_recall
[docs] class AffiliationFBeta(BinaryMetric, FBetaBase): """ Computes the affiliation-based :math:`F_\\beta` score :cite:`huet2022local`. The affiliation-metrics will first divide the time domain into a number of so-called affiliations: subsequences that are closest to the ground truth anomaly events. These affiliations do not have a fixed size. The precision is computed as the distance of the predicted anomalies to the ground truth event, and the recall is computed as the distance of the ground truth anomaly to the predicted anomalies. These precision and recall scores within each affiliation is then averaged. The :math:`F_\\beta` score is the harmonic mean of this average precision and recall. Parameters ---------- beta: int, float, default=1 Desired beta parameter. See also -------- AffiliationPrecision: Compute the affiliation-based Precision score. AffiliationRecall: Compute the affiliation-based Recall score. """ def __init__(self, beta: (float, int) = 1) -> None: super().__init__(beta) def _compute(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> float: affiliation_precision, affiliation_recall = _compute_affiliation_metrics( y_true, y_pred ) return self._f_score(precision=affiliation_precision, recall=affiliation_recall)
############################################################################### # AFFILIATION ZONES ########################################################### ############################################################################### def _affiliation_partition(Is, E_gt): """ Cut the events into the affiliation zones The presentation given here is from the ground truth point of view, but it is also used in the reversed direction in the main function. :param Is: events as a list of couples :param E_gt: range of the affiliation zones :return: a list of list of intervals (each interval represented by either a couple or None for empty interval). The outer list is indexed by each affiliation zone of `E_gt`. The inner list is indexed by the events of `Is`. """ out = [None] * len(E_gt) for j in range(len(E_gt)): E_gt_j = E_gt[j] discarded_idx_before = [ I[1] < E_gt_j[0] for I in Is ] # end point of predicted I is before the begin of E discarded_idx_after = [ I[0] > E_gt_j[1] for I in Is ] # start of predicted I is after the end of E kept_index = [ not (a or b) for a, b in zip(discarded_idx_before, discarded_idx_after) ] Is_j = [x for x, y in zip(Is, kept_index)] out[j] = [_interval_intersection(I, E_gt[j]) for I in Is_j] return out def _t_start(j, Js, Trange): """ Helper for `E_gt_func` :param j: index from 0 to len(Js) (included) on which to get the start :param Js: ground truth events, as a list of couples :param Trange: range of the series where Js is included :return: generalized start such that the middle of t_start and t_stop always gives the affiliation zone """ b = max(Trange) n = len(Js) if j == n: return 2 * b - _t_stop(n - 1, Js, Trange) else: return Js[j][0] def _t_stop(j, Js, Trange): """ Helper for `E_gt_func` :param j: index from 0 to len(Js) (included) on which to get the stop :param Js: ground truth events, as a list of couples :param Trange: range of the series where Js is included :return: generalized stop such that the middle of t_start and t_stop always gives the affiliation zone """ if j == -1: a = min(Trange) return 2 * a - _t_start(0, Js, Trange) else: return Js[j][1] def _E_gt_func(j, Js, Trange): """ Get the affiliation zone of element j of the ground truth :param j: index from 0 to len(Js) (excluded) on which to get the zone :param Js: ground truth events, as a list of couples :param Trange: range of the series where Js is included, can be (-math.inf, math.inf) for distance measures :return: affiliation zone of element j of the ground truth represented as a couple """ range_left = (_t_stop(j - 1, Js, Trange) + _t_start(j, Js, Trange)) / 2 range_right = (_t_stop(j, Js, Trange) + _t_start(j + 1, Js, Trange)) / 2 return range_left, range_right def _get_all_E_gt_func(Js, Trange): """ Get the affiliation partition from the ground truth point of view :param Js: ground truth events, as a list of couples :param Trange: range of the series where Js is included, can be (-math.inf, math.inf) for distance measures :return: affiliation partition of the events """ # E_gt is the limit of affiliation/attraction for each ground truth event return [_E_gt_func(j, Js, Trange) for j in range(len(Js))] ############################################################################### # INTEGRAL INTERVAL ############################################################# ############################################################################### def _interval_length(J): """Length of an interval.""" return 0 if J is None else J[1] - J[0] def _sum_interval_lengths(Is): """Sum of length of the intervals.""" return sum([_interval_length(I) for I in Is]) def _interval_intersection(I, J): """Intersection between two intervals I and J""" if I is None or J is None: return None I_inter_J = (max(I[0], J[0]), min(I[1], J[1])) if I_inter_J[0] >= I_inter_J[1]: return None else: return I_inter_J def _interval_subset(I, J): """Checks whether I is a subset of J""" return (I[0] >= J[0]) and (I[1] <= J[1]) def _cut_into_three_func(I, J): """ Cut an interval I into a partition of 3 subsets: the elements before J, the elements belonging to J, and the elements after J """ if I is None: return None, None, None I_inter_J = _interval_intersection(I, J) if I == I_inter_J: I_before = None I_after = None elif I[1] <= J[0]: I_before = I I_after = None elif I[0] >= J[1]: I_before = None I_after = I elif (I[0] <= J[0]) and (I[1] >= J[1]): I_before = (I[0], I_inter_J[0]) I_after = (I_inter_J[1], I[1]) elif I[0] <= J[0]: I_before = (I[0], I_inter_J[0]) I_after = None elif I[1] >= J[1]: I_before = None I_after = (I_inter_J[1], I[1]) else: raise ValueError("unexpected unconsidered case") return I_before, I_inter_J, I_after def _get_pivot_j(I, J): """ Get the single point of J that is the closest to I, called 'pivot' here, with the requirement that I should be outside J """ if max(I) <= min(J): return min(J) elif min(I) >= max(J): return max(J) else: raise ValueError("I and J should have a void intersection") def _integral_mini_interval(I, J): """ In the specific case where interval I is located outside J, integral of distance from x to J over the interval x \\in I. This is the *integral* i.e. the sum. It's not the mean (not divided by the length of I yet) """ if I is None: return 0 j_pivot = _get_pivot_j(I, J) a = min(I) b = max(I) return (b - a) * abs((j_pivot - (a + b) / 2)) def _integral_interval_distance(I, J): """ For any non-empty intervals I, J, compute the integral of distance from x to J over the interval x \\in I. This is the *integral* i.e. the sum. It's not the mean (not divided by the length of I yet) The interval I can intersect J or not """ # I and J are single intervals (not generic sets) # I is a predicted interval in the range of affiliation of J def f(I_cut): return _integral_mini_interval(I_cut, J) # If I_middle is fully included into J, it is the distance to J is always 0 def f0(_): return 0 cut_into_three = _cut_into_three_func(I, J) # Distance for now, not the mean: # Distance left: Between cut_into_three[0] and the point min(J) d_left = f(cut_into_three[0]) # Distance middle: Between cut_into_three[1] = I inter J, and J d_middle = f0(cut_into_three[1]) # Distance right: Between cut_into_three[2] and the point max(J) d_right = f(cut_into_three[2]) # It's an integral so summable return d_left + d_middle + d_right def _integral_mini_interval_P_CDFmethod__min_piece(I, J, E): """ Helper of `integral_mini_interval_Pprecision_CDFmethod` In the specific case where interval I is located outside J, compute the integral $\\int_{d_min}^{d_max} \\min(m, x) dx$, with: - m the smallest distance from J to E, - d_min the smallest distance d(x, J) from x \\in I to J - d_max the largest distance d(x, J) from x \\in I to J """ if _interval_intersection(I, J) is not None: raise ValueError("I and J should have a void intersection") if not _interval_subset(J, E): raise ValueError("J should be included in E") if not _interval_subset(I, E): raise ValueError("I should be included in E") e_min = min(E) j_min = min(J) j_max = max(J) e_max = max(E) i_min = min(I) i_max = max(I) d_min = max(i_min - j_max, j_min - i_max) d_max = max(i_max - j_max, j_min - i_min) m = min(j_min - e_min, e_max - j_max) A = min(d_max, m) ** 2 - min(d_min, m) ** 2 B = max(d_max, m) - max(d_min, m) C = (1 / 2) * A + m * B return C def _integral_mini_interval_Pprecision_CDFmethod(I, J, E): """ Integral of the probability of distances over the interval I. In the specific case where interval I is located outside J, compute the integral $\\int_{x \\in I} Fbar(dist(x,J)) dx$. This is the *integral* i.e. the sum (not the mean) """ integral_min_piece = _integral_mini_interval_P_CDFmethod__min_piece(I, J, E) e_min = min(E) j_min = min(J) j_max = max(J) e_max = max(E) i_min = min(I) i_max = max(I) d_min = max(i_min - j_max, j_min - i_max) d_max = max(i_max - j_max, j_min - i_min) integral_linear_piece = (1 / 2) * (d_max**2 - d_min**2) integral_remaining_piece = (j_max - j_min) * (i_max - i_min) DeltaI = i_max - i_min DeltaE = e_max - e_min return DeltaI - (1 / DeltaE) * ( integral_min_piece + integral_linear_piece + integral_remaining_piece ) def _integral_interval_probaCDF_precision(I, J, E): """ Integral of the probability of distances over the interval I. Compute the integral $\\int_{x \\in I} Fbar(dist(x,J)) dx$. This is the *integral* i.e. the sum (not the mean) """ # I and J are single intervals (not generic sets) def f(I_cut): return ( 0 if I_cut is None else _integral_mini_interval_Pprecision_CDFmethod(I_cut, J, E) ) # If I_middle is fully included into J, it is # integral of 1 on the interval I_middle, so it's |I_middle| def f0(I_middle): return 0 if I_middle is None else max(I_middle) - min(I_middle) cut_into_three = _cut_into_three_func(I, J) # Distance for now, not the mean: # Distance left: Between cut_into_three[0] and the point min(J) d_left = f(cut_into_three[0]) # Distance middle: Between cut_into_three[1] = I inter J, and J d_middle = f0(cut_into_three[1]) # Distance right: Between cut_into_three[2] and the point max(J) d_right = f(cut_into_three[2]) # It's an integral so summable return d_left + d_middle + d_right def _cut_J_based_on_mean_func(J, e_mean): """ Helper function for the recall. Partition J into two intervals: before and after e_mean (e_mean represents the center element of E the zone of affiliation) """ if J is None: J_before = None J_after = None elif e_mean >= max(J): J_before = J J_after = None elif e_mean <= min(J): J_before = None J_after = J else: # e_mean is across J J_before = (min(J), e_mean) J_after = (e_mean, max(J)) return J_before, J_after def _integral_mini_interval_Precall_CDFmethod(I, J, E): """ Integral of the probability of distances over the interval J. In the specific case where interval J is located outside I, compute the integral $\\int_{y \\in J} Fbar_y(dist(y,I)) dy$. This is the *integral* i.e. the sum (not the mean) """ # The interval J should be located outside I # (so it's either the left piece or the right piece w.r.t I) i_pivot = _get_pivot_j(J, I) e_min = min(E) e_max = max(E) e_mean = (e_min + e_max) / 2 # If i_pivot is outside E (it's possible), then # the distance is worst that any random element within E, # so we set the recall to 0 if i_pivot <= min(E): return 0 elif i_pivot >= max(E): return 0 # Otherwise, we have at least i_pivot in E and so d < M so min(d,M)=d cut_J_based_on_e_mean = _cut_J_based_on_mean_func(J, e_mean) J_before = cut_J_based_on_e_mean[0] J_after = cut_J_based_on_e_mean[1] iemin_mean = (e_min + i_pivot) / 2 cut_Jbefore_based_on_iemin_mean = _cut_J_based_on_mean_func(J_before, iemin_mean) J_before_closeE = cut_Jbefore_based_on_iemin_mean[ 0 ] # before e_mean and closer to e_min than i_pivot ~ J_before_before J_before_closeI = cut_Jbefore_based_on_iemin_mean[ 1 ] # before e_mean and closer to i_pivot than e_min ~ J_before_after iemax_mean = (e_max + i_pivot) / 2 cut_Jafter_based_on_iemax_mean = _cut_J_based_on_mean_func(J_after, iemax_mean) J_after_closeI = cut_Jafter_based_on_iemax_mean[ 0 ] # after e_mean and closer to i_pivot than e_max ~ J_after_before J_after_closeE = cut_Jafter_based_on_iemax_mean[ 1 ] # after e_mean and closer to e_max than i_pivot ~ J_after_after if J_before_closeE is not None: j_before_before_min = min(J_before_closeE) # == min(J) j_before_before_max = max(J_before_closeE) else: j_before_before_min = math.nan j_before_before_max = math.nan if J_before_closeI is not None: j_before_after_min = min(J_before_closeI) # == j_before_before_max if existing j_before_after_max = max(J_before_closeI) # == max(J_before) else: j_before_after_min = math.nan j_before_after_max = math.nan if J_after_closeI is not None: j_after_before_min = min(J_after_closeI) # == min(J_after) j_after_before_max = max(J_after_closeI) else: j_after_before_min = math.nan j_after_before_max = math.nan if J_after_closeE is not None: j_after_after_min = min(J_after_closeE) # == j_after_before_max if existing j_after_after_max = max(J_after_closeE) # == max(J) else: j_after_after_min = math.nan j_after_after_max = math.nan # <-- J_before_closeE --> <-- J_before_closeI --> <-- J_after_closeI --> <-- J_after_closeE --> # j_bb_min j_bb_max j_ba_min j_ba_max j_ab_min j_ab_max j_aa_min j_aa_max # (with `b` for before and `a` for after in the previous variable names) # vs e_mean m = min(t-e_min, e_max-t) d=|i_pivot-t| min(d,m) \int min(d,m)dt \int d dt \int_(min(d,m)+d)dt \int_{t \in J}(min(d,m)+d)dt # Case J_before_closeE & i_pivot after J before t-e_min i_pivot-t min(i_pivot-t,t-e_min) = t-e_min t^2/2-e_min*t i_pivot*t-t^2/2 t^2/2-e_min*t+i_pivot*t-t^2/2 = (i_pivot-e_min)*t (i_pivot-e_min)*tB - (i_pivot-e_min)*tA = (i_pivot-e_min)*(tB-tA) # Case J_before_closeI & i_pivot after J before t-e_min i_pivot-t min(i_pivot-t,t-e_min) = i_pivot-t i_pivot*t-t^2/2 i_pivot*t-t^2/2 i_pivot*t-t^2/2+i_pivot*t-t^2/2 = 2*i_pivot*t-t^2 2*i_pivot*tB-tB^2 - 2*i_pivot*tA + tA^2 = 2*i_pivot*(tB-tA) - (tB^2 - tA^2) # Case J_after_closeI & i_pivot after J after e_max-t i_pivot-t min(i_pivot-t,e_max-t) = i_pivot-t i_pivot*t-t^2/2 i_pivot*t-t^2/2 i_pivot*t-t^2/2+i_pivot*t-t^2/2 = 2*i_pivot*t-t^2 2*i_pivot*tB-tB^2 - 2*i_pivot*tA + tA^2 = 2*i_pivot*(tB-tA) - (tB^2 - tA^2) # Case J_after_closeE & i_pivot after J after e_max-t i_pivot-t min(i_pivot-t,e_max-t) = e_max-t e_max*t-t^2/2 i_pivot*t-t^2/2 e_max*t-t^2/2+i_pivot*t-t^2/2 = (e_max+i_pivot)*t-t^2 (e_max+i_pivot)*tB-tB^2 - (e_max+i_pivot)*tA + tA^2 = (e_max+i_pivot)*(tB-tA) - (tB^2 - tA^2) # # Case J_before_closeE & i_pivot before J before t-e_min t-i_pivot min(t-i_pivot,t-e_min) = t-e_min t^2/2-e_min*t t^2/2-i_pivot*t t^2/2-e_min*t+t^2/2-i_pivot*t = t^2-(e_min+i_pivot)*t tB^2-(e_min+i_pivot)*tB - tA^2 + (e_min+i_pivot)*tA = (tB^2 - tA^2) - (e_min+i_pivot)*(tB-tA) # Case J_before_closeI & i_pivot before J before t-e_min t-i_pivot min(t-i_pivot,t-e_min) = t-i_pivot t^2/2-i_pivot*t t^2/2-i_pivot*t t^2/2-i_pivot*t+t^2/2-i_pivot*t = t^2-2*i_pivot*t tB^2-2*i_pivot*tB - tA^2 + 2*i_pivot*tA = (tB^2 - tA^2) - 2*i_pivot*(tB-tA) # Case J_after_closeI & i_pivot before J after e_max-t t-i_pivot min(t-i_pivot,e_max-t) = t-i_pivot t^2/2-i_pivot*t t^2/2-i_pivot*t t^2/2-i_pivot*t+t^2/2-i_pivot*t = t^2-2*i_pivot*t tB^2-2*i_pivot*tB - tA^2 + 2*i_pivot*tA = (tB^2 - tA^2) - 2*i_pivot*(tB-tA) # Case J_after_closeE & i_pivot before J after e_max-t t-i_pivot min(t-i_pivot,e_max-t) = e_max-t e_max*t-t^2/2 t^2/2-i_pivot*t e_max*t-t^2/2+t^2/2-i_pivot*t = (e_max-i_pivot)*t (e_max-i_pivot)*tB - (e_max-i_pivot)*tA = (e_max-i_pivot)*(tB-tA) if i_pivot >= max(J): part1_before_closeE = (i_pivot - e_min) * ( j_before_before_max - j_before_before_min ) # (i_pivot-e_min)*(tB-tA) # j_before_before_max - j_before_before_min part2_before_closeI = 2 * i_pivot * ( j_before_after_max - j_before_after_min ) - ( j_before_after_max**2 - j_before_after_min**2 ) # 2*i_pivot*(tB-tA) - (tB^2 - tA^2) # j_before_after_max - j_before_after_min part3_after_closeI = 2 * i_pivot * (j_after_before_max - j_after_before_min) - ( j_after_before_max**2 - j_after_before_min**2 ) # 2*i_pivot*(tB-tA) - (tB^2 - tA^2) # j_after_before_max - j_after_before_min part4_after_closeE = (e_max + i_pivot) * ( j_after_after_max - j_after_after_min ) - ( j_after_after_max**2 - j_after_after_min**2 ) # (e_max+i_pivot)*(tB-tA) - (tB^2 - tA^2) # j_after_after_max - j_after_after_min out_parts = [ part1_before_closeE, part2_before_closeI, part3_after_closeI, part4_after_closeE, ] elif i_pivot <= min(J): part1_before_closeE = (j_before_before_max**2 - j_before_before_min**2) - ( e_min + i_pivot ) * ( j_before_before_max - j_before_before_min ) # (tB^2 - tA^2) - (e_min+i_pivot)*(tB-tA) # j_before_before_max - j_before_before_min part2_before_closeI = ( j_before_after_max**2 - j_before_after_min**2 ) - 2 * i_pivot * ( j_before_after_max - j_before_after_min ) # (tB^2 - tA^2) - 2*i_pivot*(tB-tA) # j_before_after_max - j_before_after_min part3_after_closeI = ( j_after_before_max**2 - j_after_before_min**2 ) - 2 * i_pivot * ( j_after_before_max - j_after_before_min ) # (tB^2 - tA^2) - 2*i_pivot*(tB-tA) # j_after_before_max - j_after_before_min part4_after_closeE = (e_max - i_pivot) * ( j_after_after_max - j_after_after_min ) # (e_max-i_pivot)*(tB-tA) # j_after_after_max - j_after_after_min out_parts = [ part1_before_closeE, part2_before_closeI, part3_after_closeI, part4_after_closeE, ] else: raise ValueError("The i_pivot should be outside J") out_integral_min_dm_plus_d = _sum_wo_nan( out_parts ) # integral on all J, i.e. sum of the disjoint parts # We have for each point t of J: # \bar{F}_{t, recall}(d) = 1 - (1/|E|) * (min(d,m) + d) # Since t is a single-point here, and we are in the case where i_pivot is inside E. # The integral is then given by: # C = \int_{t \in J} \bar{F}_{t, recall}(D(t)) dt # = \int_{t \in J} 1 - (1/|E|) * (min(d,m) + d) dt # = |J| - (1/|E|) * [\int_{t \in J} (min(d,m) + d) dt] # = |J| - (1/|E|) * out_integral_min_dm_plus_d DeltaJ = max(J) - min(J) DeltaE = max(E) - min(E) C = DeltaJ - (1 / DeltaE) * out_integral_min_dm_plus_d return C def _integral_interval_probaCDF_recall(I, J, E): """ Integral of the probability of distances over the interval J. Compute the integral $\\int_{y \\in J} Fbar_y(dist(y,I)) dy$. This is the *integral* i.e. the sum (not the mean) """ # I and J are single intervals (not generic sets) # E is the outside affiliation interval of J (even for recall!) # (in particular J \subset E) # # J is the portion of the ground truth affiliated to I # I is a predicted interval (can be outside E possibly since it's recall) def f(J_cut): return ( 0 if J_cut is None else _integral_mini_interval_Precall_CDFmethod(I, J_cut, E) ) # If J_middle is fully included into I, it is # integral of 1 on the interval J_middle, so it's |J_middle| def f0(J_middle): return 0 if J_middle is None else max(J_middle) - min(J_middle) cut_into_three = _cut_into_three_func( J, I ) # it's J that we cut into 3, depending on the position w.r.t I # since we integrate over J this time. # Distance for now, not the mean: # Distance left: Between cut_into_three[0] and the point min(I) d_left = f(cut_into_three[0]) # Distance middle: Between cut_into_three[1] = J inter I, and I d_middle = f0(cut_into_three[1]) # Distance right: Between cut_into_three[2] and the point max(I) d_right = f(cut_into_three[2]) # It's an integral so summable return d_left + d_middle + d_right ############################################################################### # SINGLE GROUND TRUTH EVENTS ################################################## ############################################################################### def _affiliation_precision_proba(Is, J, E): """Compute the individual precision probability from Is to a single ground truth J""" if all([I is None for I in Is]): # no prediction in the current area return math.nan # undefined return sum( [_integral_interval_probaCDF_precision(I, J, E) for I in Is] ) / _sum_interval_lengths(Is) def _affiliation_recall_proba(Is, J, E): """Compute the individual recall probability from a single ground truth J to Is""" Is = [I for I in Is if I is not None] # filter possible None in Is if len(Is) == 0: # there is no prediction in the current area return 0 E_gt_recall = _get_all_E_gt_func( Is, E ) # here from the point of view of the predictions Js = _affiliation_partition( [J], E_gt_recall ) # partition of J depending of proximity with Is return sum( [_integral_interval_probaCDF_recall(I, J[0], E) for I, J in zip(Is, Js)] ) / _interval_length(J) ############################################################################### # GENERICS #################################################################### ############################################################################### def _sum_wo_nan(vec): """Sum of elements, ignoring math.isnan ones.""" return sum([e for e in vec if not math.isnan(e)]) def _len_wo_nan(vec): """Count of elements, ignoring math.isnan ones.""" return len([e for e in vec if not math.isnan(e)])