Source code for dtaianomaly.workflow._Workflow

from dtaianomaly.anomaly_detection import BaseDetector
from dtaianomaly.data import LazyDataLoader
from dtaianomaly.evaluation import Metric
from dtaianomaly.preprocessing import Preprocessor
from dtaianomaly.thresholding import Thresholding
from dtaianomaly.type_validation import ListAttribute, ObjectAttribute
from dtaianomaly.utils import convert_to_list
from dtaianomaly.workflow._Job import Job
from dtaianomaly.workflow._JobBasedWorkflow import JobBasedWorkflow

__all__ = ["Workflow"]


[docs] class Workflow(JobBasedWorkflow): """ Run anomaly detection experiments. Run all combinations of ``dataloaders``, ``preprocessors``, ``detectors``, and ``metrics``. The metrics requiring a thresholding operation are combined with every element of ``thresholds``. If an error occurs in any execution of an anomaly detector or loading of data, then the error will be written to an error file, which is an executable Python file to reproduce the error. Parameters ---------- dataloaders : LazyDataLoader or list of LazyDataLoader The dataloaders that will be used to load data, and consequently this data is used for evaluation within this workflow. metrics : Metric or list of Metric The metrics to evaluate within this workflow. detectors : BaseDetector or list of BaseDetector The anomaly detectors to evaluate. preprocessors : Preprocessor or list of Preprocessor, default=None The preprocessors to apply before evaluating the model. If equals None or an empty list, then no preprocessing will be done, aka. using :py:class:`dtaianomaly.preprocessing.Preprocessor` as the preprocessor for each pipeline. thresholds : Thresholding or list of Thresholding, default=None The thresholds used for converting continuous anomaly scores to binary anomaly predictions. Each threshold will be combined with each :py:class:`~dtaianomaly.evaluation.BinaryMetric` given via the ``metrics`` parameter. The thresholds do not apply on a :py:class:`~dtaianomaly.evaluation.ProbaMetric`. If equals None or an empty list, then all the given metrics via the ``metrics`` argument must be of type :py:class:`~dtaianomaly.evaluation.ProbaMetric`. Otherwise, a ValueError will be raised. n_jobs : int, default=1 Number of processes to run in parallel while evaluating all combinations. trace_memory : bool, default=False Whether or not memory usage of each run is reported. While this might give additional insights into the models, their runtime will be higher due to additional internal bookkeeping. anomaly_scores_path : str, default=None The path where the anomaly scores should be saved. If ``None``, the anomaly scores will not be saved. error_log_path : str, default='./error_logs' The path in which the error logs should be saved. fit_unsupervised_on_test_data : bool, default=False Whether to fit the unsupervised anomaly detectors on the test data. If True, then the test data will be used to fit the detector and to evaluate the detector. This is no issue, since unsupervised detectors do not use labels and can deal with anomalies in the training data. fit_semi_supervised_on_test_data : bool, default=False Whether to fit the semi-supervised anomaly detectors on the test data. If True, then the test data will be used to fit the detector and to evaluate the detector. This is not really an issue, because it only breaks the assumption of semi-supervised methods of normal training data. However, these methods do not use the training labels themselves. show_progress : bool, default=False Whether to show the progress using a TQDM progress bar or not. .. note:: Ensure ``tqdm`` installed for this (which is not part of the core dependencies of ``dtaianomaly``). Otherwise, no progress bar will be shown. Examples -------- >>> from dtaianomaly.data import DemonstrationTimeSeriesLoader >>> from dtaianomaly.anomaly_detection import MatrixProfileDetector, IsolationForest >>> from dtaianomaly.evaluation import AreaUnderROC, AreaUnderPR >>> from dtaianomaly.workflow import Workflow >>> workflow = Workflow( ... dataloaders=DemonstrationTimeSeriesLoader(), ... detectors=[MatrixProfileDetector(window_size=100), IsolationForest(15)], ... metrics=[AreaUnderROC(), AreaUnderPR()] ... ) >>> workflow.run() # doctest: +SKIP """ dataloaders: list[LazyDataLoader] preprocessors: list[Preprocessor] detectors: list[BaseDetector] attribute_validation = { "dataloaders": ListAttribute(ObjectAttribute(LazyDataLoader), minimum_length=1), "preprocessors": ListAttribute(ObjectAttribute(Preprocessor)), "detectors": ListAttribute(ObjectAttribute(BaseDetector), minimum_length=1), } def __init__( self, dataloaders: LazyDataLoader | list[LazyDataLoader], metrics: Metric | list[Metric], detectors: BaseDetector | list[BaseDetector], preprocessors: Preprocessor | list[Preprocessor] = None, thresholds: Thresholding | list[Thresholding] = None, n_jobs: int = 1, trace_memory: bool = False, anomaly_scores_path: str = None, error_log_path: str = "./error_logs", fit_unsupervised_on_test_data: bool = False, fit_semi_supervised_on_test_data: bool = False, show_progress: bool = False, ): # Make sure the inputs are lists. dataloaders = convert_to_list(dataloaders) preprocessors = convert_to_list(preprocessors or []) detectors = convert_to_list(detectors) # Initialize the jobs if len(preprocessors) == 0: jobs = [ Job(dataloader, detector) for dataloader in dataloaders for detector in detectors ] else: jobs = [ Job(dataloader, detector, preprocessor) for dataloader in dataloaders for preprocessor in preprocessors for detector in detectors ] super().__init__( jobs=jobs, metrics=metrics, thresholds=thresholds, n_jobs=n_jobs, trace_memory=trace_memory, anomaly_scores_path=anomaly_scores_path, error_log_path=error_log_path, fit_unsupervised_on_test_data=fit_unsupervised_on_test_data, fit_semi_supervised_on_test_data=fit_semi_supervised_on_test_data, show_progress=show_progress, ) self.dataloaders = dataloaders self.preprocessors = preprocessors self.detectors = detectors