Source code for dtaianomaly.workflow.Workflow

from dtaianomaly.anomaly_detection.BaseDetector import BaseDetector
from dtaianomaly.data.LazyDataLoader import LazyDataLoader
from dtaianomaly.evaluation.metrics import Metric
from dtaianomaly.preprocessing.Preprocessor import Preprocessor
from dtaianomaly.thresholding.thresholding import Thresholding
from dtaianomaly.workflow.JobBasedWorkflow import Job, JobBasedWorkflow
from dtaianomaly.workflow.utils import convert_to_list


[docs] class Workflow(JobBasedWorkflow): """ Run anomaly detection experiments Run all combinations of ``dataloaders``, ``preprocessors``, ``detectors``, and ``metrics``. The metrics requiring a thresholding operation are combined with every element of ``thresholds``. If an error occurs in any execution of an anomaly detector or loading of data, then the error will be written to an error file, which is an executable Python file to reproduce the error. Parameters ---------- dataloaders: LazyDataLoader or list of LazyDataLoader The dataloaders that will be used to load data, and consequently this data is used for evaluation within this workflow. metrics: Metric or list of Metric The metrics to evaluate within this workflow. detectors: BaseDetector or list of BaseDetector The anomaly detectors to evaluate. thresholds: Thresholding or list of Thresholding, default=None The thresholds used for converting continuous anomaly scores to binary anomaly predictions. Each threshold will be combined with each :py:class:`~dtaianomaly.evaluation.BinaryMetric` given via the ``metrics`` parameter. The thresholds do not apply on a :py:class:`~dtaianomaly.evaluation.ProbaMetric`. If equals None or an empty list, then all the given metrics via the ``metrics`` argument must be of type :py:class:`~dtaianomaly.evaluation.ProbaMetric`. Otherwise, a ValueError will be raised. preprocessors: Preprocessor or list of Preprocessor, default=None The preprocessors to apply before evaluating the model. If equals None or an empty list, then no preprocessing will be done, aka. using :py:class:`dtaianomaly.preprocessing.Preprocessor` as the preprocessor for each pipeline. n_jobs: int, default=1 Number of processes to run in parallel while evaluating all combinations. trace_memory: bool, default=False Whether or not memory usage of each run is reported. While this might give additional insights into the models, their runtime will be higher due to additional internal bookkeeping. error_log_path: str, default='./error_logs' The path in which the error logs should be saved. fit_unsupervised_on_test_data: bool, default=False Whether to fit the unsupervised anomaly detectors on the test data. If True, then the test data will be used to fit the detector and to evaluate the detector. This is no issue, since unsupervised detectors do not use labels and can deal with anomalies in the training data. fit_semi_supervised_on_test_data: bool, default=False Whether to fit the semi-supervised anomaly detectors on the test data. If True, then the test data will be used to fit the detector and to evaluate the detector. This is not really an issue, because it only breaks the assumption of semi-supervised methods of normal training data. However, these methods do not use the training labels themselves. show_progress: bool, default=False Whether to show the progress using a TQDM progress bar or not. .. note:: Ensure ``tqdm`` installed for this (which is not part of the core dependencies of ``dtaianomaly``). Otherwise, no progress bar will be shown. Attributes ---------- jobs: list of Job The jobs to execute within this workflow. """ dataloaders: list[LazyDataLoader] preprocessors: list[Preprocessor] detectors: list[BaseDetector] def __init__( self, dataloaders: LazyDataLoader | list[LazyDataLoader], metrics: Metric | list[Metric], detectors: BaseDetector | list[BaseDetector], preprocessors: Preprocessor | list[Preprocessor] = None, thresholds: Thresholding | list[Thresholding] = None, n_jobs: int = 1, trace_memory: bool = False, error_log_path: str = "./error_logs", fit_unsupervised_on_test_data: bool = False, fit_semi_supervised_on_test_data: bool = False, show_progress: bool = False, ): # Make sure the inputs are lists. dataloaders = convert_to_list(dataloaders) preprocessors = convert_to_list(preprocessors or []) detectors = convert_to_list(detectors) # Initialize the jobs if len(preprocessors) == 0: jobs = [ Job(dataloader, None, detector) for dataloader in dataloaders for detector in detectors ] else: jobs = [ Job(dataloader, preprocessor, detector) for dataloader in dataloaders for preprocessor in preprocessors for detector in detectors ] super().__init__( jobs=jobs, metrics=metrics, thresholds=thresholds, n_jobs=n_jobs, trace_memory=trace_memory, error_log_path=error_log_path, fit_unsupervised_on_test_data=fit_unsupervised_on_test_data, fit_semi_supervised_on_test_data=fit_semi_supervised_on_test_data, show_progress=show_progress, ) # Set the properties of this workflow self.dataloaders = dataloaders self.preprocessors = preprocessors self.detectors = detectors