from dtaianomaly.anomaly_detection import BaseDetector
from dtaianomaly.data import LazyDataLoader
from dtaianomaly.evaluation import Metric
from dtaianomaly.preprocessing import Preprocessor
from dtaianomaly.thresholding import Thresholding
from dtaianomaly.type_validation import ListAttribute, ObjectAttribute
from dtaianomaly.utils import convert_to_list
from dtaianomaly.workflow._Job import Job
from dtaianomaly.workflow._JobBasedWorkflow import JobBasedWorkflow
__all__ = ["Workflow"]
[docs]
class Workflow(JobBasedWorkflow):
"""
Run anomaly detection experiments.
Run all combinations of ``dataloaders``, ``preprocessors``, ``detectors``,
and ``metrics``. The metrics requiring a thresholding operation are
combined with every element of ``thresholds``. If an error occurs in any
execution of an anomaly detector or loading of data, then the error will
be written to an error file, which is an executable Python file to reproduce
the error.
Parameters
----------
dataloaders : LazyDataLoader or list of LazyDataLoader
The dataloaders that will be used to load data, and consequently
this data is used for evaluation within this workflow.
metrics : Metric or list of Metric
The metrics to evaluate within this workflow.
detectors : BaseDetector or list of BaseDetector
The anomaly detectors to evaluate.
preprocessors : Preprocessor or list of Preprocessor, default=None
The preprocessors to apply before evaluating the model. If equals
None or an empty list, then no preprocessing will be done, aka.
using :py:class:`dtaianomaly.preprocessing.Preprocessor` as the
preprocessor for each pipeline.
thresholds : Thresholding or list of Thresholding, default=None
The thresholds used for converting continuous anomaly scores to
binary anomaly predictions. Each threshold will be combined with
each :py:class:`~dtaianomaly.evaluation.BinaryMetric` given via
the ``metrics`` parameter. The thresholds do not apply on a
:py:class:`~dtaianomaly.evaluation.ProbaMetric`. If equals None
or an empty list, then all the given metrics via the ``metrics``
argument must be of type :py:class:`~dtaianomaly.evaluation.ProbaMetric`.
Otherwise, a ValueError will be raised.
n_jobs : int, default=1
Number of processes to run in parallel while evaluating all
combinations.
trace_memory : bool, default=False
Whether or not memory usage of each run is reported. While this
might give additional insights into the models, their runtime
will be higher due to additional internal bookkeeping.
anomaly_scores_path : str, default=None
The path where the anomaly scores should be saved. If ``None``, the
anomaly scores will not be saved.
error_log_path : str, default='./error_logs'
The path in which the error logs should be saved.
fit_unsupervised_on_test_data : bool, default=False
Whether to fit the unsupervised anomaly detectors on the test data.
If True, then the test data will be used to fit the detector and
to evaluate the detector. This is no issue, since unsupervised
detectors do not use labels and can deal with anomalies in the
training data.
fit_semi_supervised_on_test_data : bool, default=False
Whether to fit the semi-supervised anomaly detectors on the test data.
If True, then the test data will be used to fit the detector and
to evaluate the detector. This is not really an issue, because it only
breaks the assumption of semi-supervised methods of normal training data.
However, these methods do not use the training labels themselves.
show_progress : bool, default=False
Whether to show the progress using a TQDM progress bar or not.
.. note::
Ensure ``tqdm`` installed for this (which is not part of the core
dependencies of ``dtaianomaly``). Otherwise, no progress bar will
be shown.
Examples
--------
>>> from dtaianomaly.data import DemonstrationTimeSeriesLoader
>>> from dtaianomaly.anomaly_detection import MatrixProfileDetector, IsolationForest
>>> from dtaianomaly.evaluation import AreaUnderROC, AreaUnderPR
>>> from dtaianomaly.workflow import Workflow
>>> workflow = Workflow(
... dataloaders=DemonstrationTimeSeriesLoader(),
... detectors=[MatrixProfileDetector(window_size=100), IsolationForest(15)],
... metrics=[AreaUnderROC(), AreaUnderPR()]
... )
>>> workflow.run() # doctest: +SKIP
"""
dataloaders: list[LazyDataLoader]
preprocessors: list[Preprocessor]
detectors: list[BaseDetector]
attribute_validation = {
"dataloaders": ListAttribute(ObjectAttribute(LazyDataLoader), minimum_length=1),
"preprocessors": ListAttribute(ObjectAttribute(Preprocessor)),
"detectors": ListAttribute(ObjectAttribute(BaseDetector), minimum_length=1),
}
def __init__(
self,
dataloaders: LazyDataLoader | list[LazyDataLoader],
metrics: Metric | list[Metric],
detectors: BaseDetector | list[BaseDetector],
preprocessors: Preprocessor | list[Preprocessor] = None,
thresholds: Thresholding | list[Thresholding] = None,
n_jobs: int = 1,
trace_memory: bool = False,
anomaly_scores_path: str = None,
error_log_path: str = "./error_logs",
fit_unsupervised_on_test_data: bool = False,
fit_semi_supervised_on_test_data: bool = False,
show_progress: bool = False,
):
# Make sure the inputs are lists.
dataloaders = convert_to_list(dataloaders)
preprocessors = convert_to_list(preprocessors or [])
detectors = convert_to_list(detectors)
# Initialize the jobs
if len(preprocessors) == 0:
jobs = [
Job(dataloader, detector)
for dataloader in dataloaders
for detector in detectors
]
else:
jobs = [
Job(dataloader, detector, preprocessor)
for dataloader in dataloaders
for preprocessor in preprocessors
for detector in detectors
]
super().__init__(
jobs=jobs,
metrics=metrics,
thresholds=thresholds,
n_jobs=n_jobs,
trace_memory=trace_memory,
anomaly_scores_path=anomaly_scores_path,
error_log_path=error_log_path,
fit_unsupervised_on_test_data=fit_unsupervised_on_test_data,
fit_semi_supervised_on_test_data=fit_semi_supervised_on_test_data,
show_progress=show_progress,
)
self.dataloaders = dataloaders
self.preprocessors = preprocessors
self.detectors = detectors