Source code for dtaianomaly.workflow._JobBasedWorkflow

import multiprocessing
import os
import tempfile
import time
import tracemalloc
import warnings
from functools import partial

import numpy as np
import pandas as pd

from dtaianomaly.anomaly_detection import BaseDetector, Supervision
from dtaianomaly.data import DataSet
from dtaianomaly.evaluation import BinaryMetric, Metric, ProbaMetric, ThresholdMetric
from dtaianomaly.thresholding import Thresholding
from dtaianomaly.type_validation import (
    AttributeValidationMixin,
    BoolAttribute,
    IntegerAttribute,
    ListAttribute,
    NoneAttribute,
    ObjectAttribute,
    PathAttribute,
)
from dtaianomaly.utils import convert_to_list
from dtaianomaly.workflow._error_logging import log_error
from dtaianomaly.workflow._Job import Job

__all__ = ["JobBasedWorkflow"]


[docs] class JobBasedWorkflow(AttributeValidationMixin): """ Run anomaly detection experiments. Runs an experiment for each given ``Job``. If an error occurs in any execution of an anomaly detector or loading of data, then the error will be written to an error file, which is an executable Python file to reproduce the error. Parameters ---------- jobs : list of Job The jobs to execute within this workflow. metrics : Metric or list of Metric The metrics to evaluate within this workflow. thresholds : Thresholding or list of Thresholding, default=None The thresholds used for converting continuous anomaly scores to binary anomaly predictions. Each threshold will be combined with each :py:class:`~dtaianomaly.evaluation.BinaryMetric` given via the ``metrics`` parameter. The thresholds do not apply on a :py:class:`~dtaianomaly.evaluation.ProbaMetric`. If equals None or an empty list, then all the given metrics via the ``metrics`` argument must be of type :py:class:`~dtaianomaly.evaluation.ProbaMetric`. Otherwise, a ValueError will be raised. n_jobs : int, default=1 Number of processes to run in parallel while evaluating all combinations. trace_memory : bool, default=False Whether or not memory usage of each run is reported. While this might give additional insights into the models, their runtime will be higher due to additional internal bookkeeping. anomaly_scores_path : str, default=None The path where the anomaly scores should be saved. If ``None``, the anomaly scores will not be saved. error_log_path : str, default='./error_logs' The path in which the error logs should be saved. fit_unsupervised_on_test_data : bool, default=False Whether to fit the unsupervised anomaly detectors on the test data. If True, then the test data will be used to fit the detector and to evaluate the detector. This is no issue, since unsupervised detectors do not use labels and can deal with anomalies in the training data. fit_semi_supervised_on_test_data : bool, default=False Whether to fit the semi-supervised anomaly detectors on the test data. If True, then the test data will be used to fit the detector and to evaluate the detector. This is not really an issue, because it only breaks the assumption of semi-supervised methods of normal training data. However, these methods do not use the training labels themselves. show_progress : bool, default=False Whether to show the progress using a TQDM progress bar or not. .. note:: Ensure ``tqdm`` installed for this (which is not part of the core dependencies of ``dtaianomaly``). Otherwise, no progress bar will be shown. Examples -------- >>> from dtaianomaly.data import DemonstrationTimeSeriesLoader >>> from dtaianomaly.anomaly_detection import MatrixProfileDetector, IsolationForest >>> from dtaianomaly.evaluation import AreaUnderROC, AreaUnderPR >>> from dtaianomaly.preprocessing import StandardScaler, MinMaxScaler >>> from dtaianomaly.workflow import JobBasedWorkflow, Job >>> workflow = JobBasedWorkflow( ... jobs=[ ... Job( ... dataloader=DemonstrationTimeSeriesLoader(), ... detector=IsolationForest(15), ... ), ... Job( ... dataloader=DemonstrationTimeSeriesLoader(), ... preprocessor=StandardScaler(), ... detector=IsolationForest(15), ... ), ... Job( ... dataloader=DemonstrationTimeSeriesLoader(), ... preprocessor=MinMaxScaler(), ... detector=IsolationForest(15), ... ), ... ], ... metrics=[AreaUnderROC(), AreaUnderPR()] ... ) >>> workflow.run() # doctest: +SKIP """ jobs = list[Job] metrics: list[ProbaMetric] n_jobs: int trace_memory: bool anomaly_scores_path: str | None error_log_path: str fit_unsupervised_on_test_data: bool fit_semi_supervised_on_test_data: bool show_progress: bool attribute_validation = { "jobs": ListAttribute(ObjectAttribute(Job), minimum_length=1), "metrics": ListAttribute(ObjectAttribute(ProbaMetric), minimum_length=1), "n_jobs": IntegerAttribute(minimum=1), "trace_memory": BoolAttribute(), "anomaly_scores_path": PathAttribute(must_exist=False) | NoneAttribute(), "error_log_path": PathAttribute(must_exist=False), "fit_unsupervised_on_test_data": BoolAttribute(), "fit_semi_supervised_on_test_data": BoolAttribute(), "show_progress": BoolAttribute(), } def __init__( self, jobs: list[Job], metrics: Metric | list[Metric], thresholds: Thresholding | list[Thresholding] = None, n_jobs: int = 1, trace_memory: bool = False, anomaly_scores_path: str = None, error_log_path: str = "./error_logs", fit_unsupervised_on_test_data: bool = False, fit_semi_supervised_on_test_data: bool = False, show_progress: bool = False, ): # Add thresholding to the binary metrics metrics = convert_to_list(metrics) thresholds = convert_to_list(thresholds or []) proba_metrics = _convert_to_proba_metrics( metrics=metrics, thresholds=thresholds ) # Set the properties of this workflow self.jobs = jobs self.metrics = proba_metrics self.n_jobs = n_jobs self.trace_memory = trace_memory self.anomaly_scores_path = anomaly_scores_path self.error_log_path = error_log_path self.fit_unsupervised_on_test_data = fit_unsupervised_on_test_data self.fit_semi_supervised_on_test_data = fit_semi_supervised_on_test_data self.show_progress = show_progress
[docs] def run(self, **kwargs) -> pd.DataFrame: """ Run the experimental workflow. Evaluate each pipeline within this workflow on each dataset within this workflow in a grid-like manner. Parameters ---------- **kwargs Additional parameters to be passed to the `fit` method of the anomaly detector. Returns ------- pd.DataFrame A pandas dataframe with the results of this workflow. Each row represents an execution of an anomaly detector on a given dataset with some preprocessing steps. The columns correspond to the different evaluation metrics, running time and potentially also the memory usage. """ # Create all the jobs if self.show_progress: try: import tqdm except ModuleNotFoundError: warnings.warn( "Flag 'tqdm_progress' was set to True in the workflow, but tqdm is not installed!\n" "No progress will be shown using tqdm. To do so, run 'pip install tqdm'!" ) self.show_progress = False # Execute the jobs if self.n_jobs == 1: if self.show_progress: import tqdm jobs = tqdm.tqdm(self.jobs) else: jobs = self.jobs result = [ _single_job( job=job, metrics=self.metrics, trace_memory=self.trace_memory, anomaly_scores_path=self.anomaly_scores_path, error_log_path=self.error_log_path, fit_unsupervised_on_test_data=self.fit_unsupervised_on_test_data, fit_semi_supervised_on_test_data=self.fit_semi_supervised_on_test_data, **kwargs, ) for job in jobs ] else: single_run_function = partial( _single_job, metrics=self.metrics, trace_memory=self.trace_memory, anomaly_scores_path=self.anomaly_scores_path, error_log_path=self.error_log_path, fit_unsupervised_on_test_data=self.fit_unsupervised_on_test_data, fit_semi_supervised_on_test_data=self.fit_semi_supervised_on_test_data, **kwargs, ) if self.show_progress: import tqdm # Run jobs with tqdm progress bar with multiprocessing.Pool(processes=self.n_jobs) as pool: with tqdm.tqdm(total=len(self.jobs)) as pbar: result = [ pool.apply_async( single_run_function, args=(job,), callback=lambda _: pbar.update(1), ) for job in self.jobs ] pool.close() pool.join() # Wait for all processes to complete result = [r.get() for r in result] else: with multiprocessing.Pool(processes=self.n_jobs) as pool: result = pool.map(single_run_function, self.jobs) # Create a dataframe of the results results_df = pd.DataFrame(result) # Reorder the columns columns = [ "Dataset", "Detector", "Preprocessor", "Runtime Fit [s]", "Runtime Predict [s]", "Runtime [s]", ] if self.trace_memory: columns.extend( ["Peak Memory Fit [MB]", "Peak Memory Predict [MB]", "Peak Memory [MB]"] ) results_df = results_df[ columns + [x for x in results_df.columns if x not in columns] ] # Drop the processors column, if none were provided. if not self.provided_preprocessors(): results_df.drop(columns="Preprocessor", inplace=True) # Return the results return results_df
def provided_preprocessors(self) -> bool: return any(job.has_preprocessor for job in self.jobs)
def _single_job( job: Job, metrics: list[ProbaMetric], trace_memory: bool, anomaly_scores_path: str, error_log_path: str, fit_unsupervised_on_test_data: bool, fit_semi_supervised_on_test_data: bool, **kwargs, ) -> dict[str, str | float]: # Initialize the results, and by default everything went wrong ('Error') results = {"Dataset": str(job.dataloader)} for key in metrics + [ "Detector", "Preprocessor", "Runtime Fit [s]", "Runtime Predict [s]", "Runtime [s]", ]: results[str(key)] = "Error" if trace_memory: for key in [ "Peak Memory Fit [MB]", "Peak Memory Predict [MB]", "Peak Memory [MB]", ]: results[key] = "Error" # Try to load the data set, if this fails, return the results try: data_set = job.dataloader.load() except Exception as exception: results["Error file"] = log_error(error_log_path, exception, job.dataloader) return results # We can already save the used preprocessor and detector results["Preprocessor"] = str(job.pipeline.preprocessor) results["Detector"] = str(job.pipeline.detector) # Check if the dataset and the anomaly detector are compatible if not data_set.is_compatible(job.pipeline): error_message = ( f"Not compatible: detector with supervision {job.pipeline.supervision} " f"for data set with compatible supervision [" ) error_message += ", ".join([str(s) for s in data_set.compatible_supervision()]) error_message += "]" for key, value in results.items(): if value == "Error": results[key] = error_message return results # Format X_train, y_train, X_test and y_test X_test, y_test, X_train, y_train, fit_on_X_train = _get_train_test_data( data_set, job.pipeline, fit_unsupervised_on_test_data, fit_semi_supervised_on_test_data, ) # Run the anomaly detector, and catch any exceptions try: # Fitting _start_tracing_memory(trace_memory) start = _start_tracing_runtime() job.pipeline.fit(X_train, y_train, **kwargs) results["Runtime Fit [s]"] = _end_tracing_runtime(start) _end_tracing_memory(trace_memory, results, "Peak Memory Fit [MB]") # Predicting _start_tracing_memory(trace_memory) start = _start_tracing_runtime() y_pred = job.pipeline.predict_proba(X_test) results["Runtime Predict [s]"] = _end_tracing_runtime(start) _end_tracing_memory(trace_memory, results, "Peak Memory Predict [MB]") # Save the anomaly scores if anomaly_scores_path is not None: os.makedirs(anomaly_scores_path, exist_ok=True) anomaly_scores_file = tempfile.NamedTemporaryFile( dir=anomaly_scores_path, delete=False, suffix=".txt" ) np.savetxt(anomaly_scores_file.name, y_pred) results["Anomaly scores file"] = anomaly_scores_file.name # Scoring _, y_test_ = job.pipeline.preprocessor.transform(X_test, y_test) results.update( { str(metric): metric.compute(y_true=y_test_, y_pred=y_pred) for metric in metrics } ) # Aggregate the used resources results["Runtime [s]"] = ( results["Runtime Fit [s]"] + results["Runtime Predict [s]"] ) if trace_memory: results["Peak Memory [MB]"] = max( results["Peak Memory Fit [MB]"], results["Peak Memory Predict [MB]"] ) except Exception as exception: # Log the errors results["Error file"] = log_error( error_log_path, exception, job.dataloader, job.pipeline, fit_on_X_train, **kwargs, ) # Return the results return results def _convert_to_proba_metrics( metrics: list[Metric], thresholds: list[Thresholding] ) -> list[ProbaMetric]: """The given lists are assumed to be non-empty.""" proba_metrics = [] for metric in metrics: if isinstance(metric, BinaryMetric): if len(thresholds) == 0: raise ValueError( "A binary metric is given but no thresholds are given!" ) proba_metrics.extend( ThresholdMetric(thresholder=threshold, metric=metric) for threshold in thresholds ) elif isinstance(metric, ProbaMetric): proba_metrics.append(metric) return proba_metrics def _start_tracing_runtime() -> float: return time.time() def _end_tracing_runtime(start_time: float) -> float: return time.time() - start_time def _start_tracing_memory(trace_memory: bool) -> None: if trace_memory: tracemalloc.start() def _end_tracing_memory(trace_memory: bool, results, key) -> None: if trace_memory: _, peak = tracemalloc.get_traced_memory() results[key] = peak / 10**6 tracemalloc.stop() def _get_train_test_data( data_set: DataSet, detector: BaseDetector, fit_unsupervised_on_test_data: bool, fit_semi_supervised_on_test_data: bool, ) -> (np.ndarray, np.ndarray, np.ndarray, np.ndarray, bool): """ Separates the train and test data depending on the type of the anomaly detector and whether the test data should be used for fitting in an unsupervised detector. Also returns a bool indicating if the train data is actually used for fitting or not. """ X_test = data_set.X_test y_test = data_set.y_test X_train = data_set.X_train y_train = data_set.y_train fit_on_X_train = True # If no train data is given but the detector is unsupervised, then use the test data for training # This is only ok if the detector is unsupervised, because no labels are used # If this happens, the train labels will be None anyway (otherwise data_set would be invalid) if detector.supervision == Supervision.UNSUPERVISED and X_train is None: X_train = X_test fit_on_X_train = False # If unsupervised detectors should fit on the test data. if ( fit_unsupervised_on_test_data and detector.supervision == Supervision.UNSUPERVISED ): X_train = X_test fit_on_X_train = False # If semi-supervised detectors should fit on the test data. if ( fit_semi_supervised_on_test_data and detector.supervision == Supervision.SEMI_SUPERVISED ): X_train = X_test fit_on_X_train = False return X_test, y_test, X_train, y_train, fit_on_X_train