import multiprocessing
import time
import tracemalloc
import numpy as np
import pandas as pd
from typing import Dict, List, Union
from functools import partial
from dtaianomaly.data.LazyDataLoader import LazyDataLoader
from dtaianomaly.data.DataSet import DataSet
from dtaianomaly.evaluation.metrics import Metric, BinaryMetric
from dtaianomaly.thresholding.thresholding import Thresholding
from dtaianomaly.preprocessing.Preprocessor import Preprocessor, Identity
from dtaianomaly.anomaly_detection.BaseDetector import BaseDetector, Supervision
from dtaianomaly.pipeline.EvaluationPipeline import EvaluationPipeline
from dtaianomaly.workflow.utils import build_pipelines, convert_to_proba_metrics, convert_to_list
from dtaianomaly.workflow.error_logging import log_error
[docs]
class Workflow:
"""
Run anomaly detection experiments
Run all combinations of ``dataloaders``, ``preprocessors``, ``detectors``,
and ``metrics``. The metrics requiring a thresholding operation are
combined with every element of ``thresholds``. If an error occurs in any
execution of an anomaly detector or loading of data, then the error will
be written to an error file, which is an executable Python file to reproduce
the error.
Parameters
----------
dataloaders: LazyDataLoader or list of LazyDataLoader
The dataloaders that will be used to load data, and consequently
this data is used for evaluation within this workflow.
metrics: Metric or list of Metric
The metrics to evaluate within this workflow.
detectors: BaseDetector or list of BaseDetector
The anomaly detectors to evaluate.
thresholds: Thresholding or list of Thresholding, default=None
The thresholds used for converting continuous anomaly scores to
binary anomaly predictions. Each threshold will be combined with
each :py:class:`~dtaianomaly.evaluation.BinaryMetric` given via
the ``metrics`` parameter. The thresholds do not apply on a
:py:class:`~dtaianomaly.evaluation.ProbaMetric`. If equals None
or an empty list, then all the given metrics via the ``metrics``
argument must be of type :py:class:`~dtaianomaly.evaluation.ProbaMetric`.
Otherwise, a ValueError will be raised.
preprocessors: Preprocessor or list of Preprocessor, default=None
The preprocessors to apply before evaluating the model. If equals
None or an empty list, then no preprocssing will be done, aka.
using :py:class:`dtaianomaly.preprocessing.Preprocessor` as the
preprocessor for each pipeline.
n_jobs: int, default=1
Number of processes to run in parallel while evaluating all
combinations.
trace_memory: bool, default=False
Whether or not memory usage of each run is reported. While this
might give additional insights into the models, their runtime
will be higher due to additional internal bookkeeping.
error_log_path: str, default='./error_logs'
The path in which the error logs should be saved.
fit_unsupervised_on_test_data: bool, default=False
Whether to fit the unsupervised anomaly detectors on the test data.
If True, then the test data will be used to fit the detector and
to evaluate the detector. This is no issue, since unsupervised
detectors do not use labels and can deal with anomalies in the
training data.
"""
dataloaders: List[LazyDataLoader]
pipelines: List[EvaluationPipeline]
provided_preprocessors: bool
n_jobs: int
trace_memory: bool
error_log_path: str
fit_unsupervised_on_test_data: bool
def __init__(self,
dataloaders: Union[LazyDataLoader, List[LazyDataLoader]],
metrics: Union[Metric, List[Metric]],
detectors: Union[BaseDetector, List[BaseDetector]],
preprocessors: Union[Preprocessor, List[Preprocessor]] = None,
thresholds: Union[Thresholding, List[Thresholding]] = None,
n_jobs: int = 1,
trace_memory: bool = False,
error_log_path: str = './error_logs',
fit_unsupervised_on_test_data: bool = False):
# Make sure the inputs are lists.
dataloaders = convert_to_list(dataloaders)
metrics = convert_to_list(metrics)
thresholds = convert_to_list(thresholds or [])
preprocessors = convert_to_list(preprocessors or [])
self.provided_preprocessors = len(preprocessors) > 0
if not self.provided_preprocessors:
preprocessors = [Identity()]
detectors = convert_to_list(detectors)
# Add thresholding to the binary metrics
if len(thresholds) == 0 and any(isinstance(metric, BinaryMetric) for metric in metrics):
raise ValueError('There should be at least one thresholding option if a binary metric is passed!')
proba_metrics = convert_to_proba_metrics(
metrics=metrics,
thresholds=thresholds
)
# Perform checks on input
if len(dataloaders) == 0:
raise ValueError('At least one data loader should be given to the workflow!')
if len(metrics) == 0:
raise ValueError('At least one metrics should be given to the workflow!')
if len(detectors) == 0:
raise ValueError('At least one detectors should be given to the workflow!')
if n_jobs < 1:
raise ValueError('There should be at least one job within a workflow!')
# Set the properties of this workflow
self.pipelines = build_pipelines(
preprocessors=preprocessors,
detectors=detectors,
metrics=proba_metrics
)
self.dataloaders = dataloaders
self.n_jobs = n_jobs
self.trace_memory = trace_memory
self.error_log_path = error_log_path
self.fit_unsupervised_on_test_data = fit_unsupervised_on_test_data
[docs]
def run(self) -> pd.DataFrame:
"""
Run the experimental workflow. Evaluate each pipeline within this
workflow on each dataset within this workflow in a grid-like manner.
Returns
-------
results: pd.DataFrame
A pandas dataframe with the results of this workflow. Each row
represents an execution of an anomaly detector on a given dataset
with some preprocessing steps. The columns correspond to the
different evaluation metrics, running time and potentially also
the memory usage.
"""
# Create all the jobs
unit_jobs = [
(dataloader, pipeline)
for dataloader in self.dataloaders
for pipeline in self.pipelines
]
# Execute the jobs
if self.n_jobs == 1:
result = [
_single_job(*job, trace_memory=self.trace_memory, error_log_path=self.error_log_path, fit_unsupervised_on_test_data=self.fit_unsupervised_on_test_data)
for job in unit_jobs
]
else:
single_run_function = partial(_single_job, trace_memory=self.trace_memory, error_log_path=self.error_log_path, fit_unsupervised_on_test_data=self.fit_unsupervised_on_test_data)
with multiprocessing.Pool(processes=self.n_jobs) as pool:
result = pool.starmap(single_run_function, unit_jobs)
# Create a dataframe of the results
results_df = pd.DataFrame(result)
# Reorder the columns
columns = ['Dataset', 'Detector', 'Preprocessor', 'Runtime [s]']
if self.trace_memory:
columns.append('Peak Memory [MB]')
results_df = results_df[columns + [x for x in results_df.columns if x not in columns]]
# Drop the processors column, if none were provided.
if not self.provided_preprocessors:
results_df.drop(columns='Preprocessor', inplace=True)
# Return the results
return results_df
def _single_job(dataloader: LazyDataLoader, pipeline: EvaluationPipeline, trace_memory: bool, error_log_path: str, fit_unsupervised_on_test_data: bool) -> Dict[str, Union[str, float]]:
# Initialize the results, and by default everything went wrong ('Error')
results = {'Dataset': str(dataloader)}
for key in pipeline.metrics + ['Detector', 'Preprocessor', 'Runtime [s]']:
results[str(key)] = 'Error'
if trace_memory:
results['Peak Memory [MB]'] = 'Error'
# Try to load the data set, if this fails, return the results
try:
data_set = dataloader.load()
except Exception as exception:
results['Error file'] = log_error(error_log_path, exception, dataloader)
return results
# We can already save the used preprocessor and detector
results['Preprocessor'] = str(pipeline.pipeline.preprocessor)
results['Detector'] = str(pipeline.pipeline.detector)
# Check if the dataset and the anomaly detector are compatible
if not data_set.is_compatible(pipeline.pipeline):
error_message = f'Not compatible: detector with supervision {pipeline.pipeline.supervision} ' \
f'for data set with compatible supervision ['
error_message += ', '.join([str(s) for s in data_set.compatible_supervision()])
error_message += ']'
for key, value in results.items():
if value == 'Error':
results[key] = error_message
return results
# Format X_train, y_train, X_test and y_test
X_test, y_test, X_train, y_train, fit_on_X_train = _get_train_test_data(data_set, pipeline.pipeline, fit_unsupervised_on_test_data)
# Start tracing the memory, if requested
if trace_memory:
tracemalloc.start()
# Evaluate the pipeline, and measure the time
start = time.time()
try:
results.update(pipeline.run(
X_test=X_test,
y_test=y_test,
X_train=X_train,
y_train=y_train
))
except Exception as exception:
results['Error file'] = log_error(error_log_path, exception, dataloader, pipeline.pipeline, fit_on_X_train)
stop = time.time()
# Save the runtime
results['Runtime [s]'] = stop - start
# Save the memory if requested, and stop tracing
if trace_memory:
_, peak = tracemalloc.get_traced_memory()
results['Peak Memory [MB]'] = peak / 10 ** 6
tracemalloc.stop()
# Return the results
return results
def _get_train_test_data(data_set: DataSet, detector: BaseDetector, fit_unsupervised_on_test_data: bool) -> (np.ndarray, np.ndarray, np.ndarray, np.ndarray, bool):
"""
Separates the train and test data depending on the type of the anomaly
detector and whether the test data should be used for fitting in an
unsupervised detector.
Also returns a bool indicating if the train data is actually used for
fitting or not.
"""
X_test = data_set.X_test
y_test = data_set.y_test
X_train = data_set.X_train
y_train = data_set.y_train
fit_on_X_train = True
# If no train data is given but the detector is unsupervised, then use the test data for training
# This is only ok if the detector is unsupervised, because no labels are used
# If this happens, the train labels will be None anyway (otherwise data_set would be invalid)
if detector.supervision == Supervision.UNSUPERVISED and X_train is None:
X_train = X_test
fit_on_X_train = False
# If unsupervised detectors should fit on the test data.
if fit_unsupervised_on_test_data and detector.supervision == Supervision.UNSUPERVISED:
X_train = X_test
fit_on_X_train = False
return X_test, y_test, X_train, y_train, fit_on_X_train