Source code for dtaianomaly.anomaly_detection.BaseNeuralDetector

import abc
from collections.abc import Callable
from typing import Literal

import numpy as np
import torch

from dtaianomaly import utils
from dtaianomaly.anomaly_detection.BaseDetector import BaseDetector, Supervision
from dtaianomaly.anomaly_detection.windowing_utils import (
    check_is_valid_window_size,
    compute_window_size,
    reverse_sliding_window,
)

_OPTIMIZER_TYPE = Literal["adam", "sgd"]
_COMPILE_MODE_TYPE = Literal[
    "default", "reduce-overhead", "max-autotune", "max-autotune-no-cudagraphs"
]
_ACTIVATION_FUNCTION_TYPE = Literal["linear", "relu", "sigmoid", "tanh"]
_MODEL_PARAMETERS_TYPE = any


####################################################################
# BASE NEURAL DETECTOR
####################################################################


[docs] class BaseNeuralDetector(BaseDetector, abc.ABC): """ Base class for neural anomaly detectors, based on PyTorch. This class implements the main functionality for training a model and detecting anomalies, including building the data loader, building the optimizer, and implementing the main train and evaluation loops. Extensions of this class should also implement methods to build the data set, the neural architecture, and how to train and evaluate on a single batch. Parameters ---------- supervision: Supervision The type of supervision this anomaly detector requires. window_size: int or str The window size to use for extracting sliding windows from the time series. This value will be passed to :py:meth:`~dtaianomaly.anomaly_detection.compute_window_size`. stride: int, default=1 The stride, i.e., the step size for extracting sliding windows from the time series. standard_scaling: bool, default=True Whether to standard scale each window independently, before feeding it to the network. batch_size: int, default=32 The size of the batches to feed to the network. data_loader_kwargs: dictionary, default=None Additional kwargs to be passed to the data loader. For more information, see: https://docs.pytorch.org/docs/stable/data.html optimizer: {"adam", "sgd"} or callable default="adam" The optimizer to use for learning the weights. If "adam" is given, then the torch.optim.Adam optimizer will be used. If "sgd" is given, then the torch.optim.SGD optimizer will be used. Otherwise, a callable should be given, which takes as input the network parameters, and then creates an optimizer. learning_rate: float, default=1e-3 The learning rate to use for training the network. Has no effect if optimize is a callable. compile_model: bool, default=False Whether the network architecture should be compiled or not before training the weights. For more information, see: https://docs.pytorch.org/docs/stable/generated/torch.compile.html compile_mode: {"default", "reduce-overhead", "max-autotune", "max-autotune-no-cudagraphs"}, default="default" Method to compile the architecture. For more information, see: https://docs.pytorch.org/docs/stable/generated/torch.compile.html n_epochs: int, default=10 The number of epochs for which the neural network should be trained. loss_function: torch.nn.Module, default=torch.nn.MSELoss() The loss function to use for updating the weights. device: str, default="cpu" The device on which te neural network should be trained. For more information, see: https://docs.pytorch.org/docs/stable/tensor_attributes.html#torch-device seed: int, default=None The seed used for training the model. This seed will update the torch and numpy seed at the beginning of the fit method. Attributes ---------- window_size_: int The effectively used window size for this anomaly detector. optimizer_: torch.optim.Optimizer The optimizer used for learning the weights of the network. neural_network_: torch.nn.Module The PyTorch network architecture. See also -------- BaseNeuralForecastingDetector: Use a neural network to forecast the time series, and detect anomalies by measuring the difference with the actual observations. BaseNeuralReconstructionDetector: Use a neural network to reconstruct windows in the time series, and detect anomalies as windows that are incorrectly reconstructed. """ _OPTIMIZERS: dict[_OPTIMIZER_TYPE, type[torch.optim.Optimizer]] = { "adam": torch.optim.Adam, "sgd": torch.optim.SGD, } _ACTIVATION_FUNCTIONS: dict[_ACTIVATION_FUNCTION_TYPE, type[torch.nn.Module]] = { "linear": torch.nn.Identity, "relu": torch.nn.ReLU, "sigmoid": torch.nn.Sigmoid, "tanh": torch.nn.Tanh, } # Preprocessing related parameters window_size: int | str stride: int standard_scaling: bool # Data loading related parameters batch_size: int data_loader_kwargs: dict[str, any] | None # Optimizer related parameters optimizer: ( _OPTIMIZER_TYPE | Callable[[_MODEL_PARAMETERS_TYPE], torch.optim.Optimizer] ) learning_rate: float # Model compilation compile_model: bool compile_mode: _COMPILE_MODE_TYPE # Training related parameters n_epochs: int loss_function: torch.nn.Module # General parameters device: str seed: int | None # Learned parameters window_size_: int optimizer_: torch.optim.Optimizer neural_network_: torch.nn.Module def __init__( self, supervision: Supervision, window_size: str | int, stride: int = 1, standard_scaling: bool = True, batch_size: int = 32, data_loader_kwargs: dict[str, any] = None, optimizer: ( _OPTIMIZER_TYPE | Callable[[_MODEL_PARAMETERS_TYPE], torch.optim.Optimizer] ) = "adam", learning_rate: float = 1e-3, compile_model: bool = False, compile_mode: _COMPILE_MODE_TYPE = "default", n_epochs: int = 10, loss_function: torch.nn.Module = torch.nn.MSELoss(), device: str = "cpu", seed: int = None, ): super().__init__(supervision) # Check preprocessing related parameters check_is_valid_window_size(window_size) if not isinstance(stride, int) or isinstance(stride, bool): raise TypeError("`stride` should be an integer") if stride < 1: raise ValueError("`stride` should be strictly positive") if not isinstance(standard_scaling, bool): raise TypeError("`standard_scaling` should be a bool") # Check the data related parameters if not isinstance(batch_size, int) or isinstance(batch_size, bool): raise TypeError("`batch_size` should be an integer") if batch_size < 1: raise ValueError("`batch_size` should be strictly positive") if data_loader_kwargs is not None: if not isinstance(data_loader_kwargs, dict): raise TypeError("`data_loader_kwargs` should be a dictionary") # Check the optimizer related parameters if not (isinstance(optimizer, str) or callable(optimizer)): raise TypeError("`optimizer` should be a string or callable") if optimizer not in self._OPTIMIZERS and not callable(optimizer): raise ValueError( f"Invalid value for `optimizer` given: '{optimizer}'. Valid options are {list(self._OPTIMIZERS.keys())}" ) if not isinstance(learning_rate, (float, int)) or isinstance( learning_rate, bool ): raise TypeError("`learning_rate` should be numerical") if learning_rate <= 0: raise ValueError("`learning_rate` should be strictly positive") # Check the training related parameters if not isinstance(loss_function, torch.nn.Module): raise TypeError("`loss_function` should be a torch.nn.Module") if not isinstance(n_epochs, int) or isinstance(n_epochs, bool): raise TypeError("`n_epochs` should be an integer") if n_epochs < 1: raise ValueError("`n_epochs` should be strictly positive") # Check model compilation parameters if not isinstance(compile_model, bool): raise TypeError("`compile_model` should be a bool") if not isinstance(compile_mode, str): raise TypeError("`compile_mode` should be a string") if compile_mode not in [ "default", "reduce-overhead", "max-autotune", "max-autotune-no-cudagraphs", ]: raise ValueError( f"Invalid value for `compile_mode` given: '{compile_mode}'. Valid options are ['default', 'reduce-overhead', 'max-autotune', 'max-autotune-no-cudagraphs']" ) # Check the device if not isinstance(device, str): raise TypeError("`device` should be a string") # Check CUDA availability if it's a CUDA device if device.startswith("cuda"): if not torch.cuda.is_available(): raise ValueError( f"Cuda-device given ('{device}'), but no cuda is available!" ) device_index = int(device.split(":")[1]) if ":" in device else None if device_index is not None and device_index >= torch.cuda.device_count(): raise ValueError( f"Cuda-index given ('{device_index}'), but only {torch.cuda.device_count()} are available!" ) try: torch.device(device) # Try to initialize a device except RuntimeError: # Raise Value error instead for consistency raise ValueError(f"Invalid input device: {device}") # Initialize the variables self.window_size = window_size self.stride = stride self.standard_scaling = standard_scaling self.batch_size = batch_size self.data_loader_kwargs = data_loader_kwargs self.optimizer = optimizer self.learning_rate = learning_rate self.loss_function = loss_function self.compile_model = compile_model self.compile_mode = compile_mode self.n_epochs = n_epochs self.device = device self.seed = seed # Test building the optimizer and the data loader self._build_data_loader(torch.utils.data.TensorDataset(torch.empty((10, 3)))) self._build_optimizer( [torch.nn.Parameter(torch.randn(3, 3, requires_grad=True))] ) @abc.abstractmethod def _build_dataset(self, X: np.ndarray) -> torch.utils.data.Dataset: """Abstract method to build the dataset.""" @abc.abstractmethod def _build_architecture(self, n_attributes: int) -> torch.nn.Module: """Abstract method to build the architecture.""" @abc.abstractmethod def _train_batch(self, batch: list[torch.Tensor]) -> float: """Abstract method to train the network on a single batch.""" @abc.abstractmethod def _evaluate_batch(self, batch: list[torch.Tensor]) -> torch.Tensor: """Abstract method to evaluate the network on a single batch.""" def _set_seed(self) -> None: if self.seed is not None: torch.manual_seed(self.seed) np.random.seed(self.seed) def _build_data_loader( self, dataset: torch.utils.data.Dataset, shuffle: bool = None ) -> torch.utils.data.DataLoader: kwargs = ( {} if self.data_loader_kwargs is None else self.data_loader_kwargs.copy() ) kwargs["batch_size"] = self.batch_size if shuffle is not None: kwargs["shuffle"] = shuffle return torch.utils.data.DataLoader(dataset, **kwargs) @staticmethod def _build_activation_function( activation_function: _ACTIVATION_FUNCTION_TYPE, ) -> torch.nn.Module: if activation_function in BaseNeuralDetector._ACTIVATION_FUNCTIONS: return BaseNeuralDetector._ACTIVATION_FUNCTIONS[activation_function]() raise ValueError( f"Invalid activation function given: '{activation_function}'. Valid options are {list(BaseNeuralDetector._ACTIVATION_FUNCTIONS.keys())}" ) def _build_optimizer( self, model_parameters: _MODEL_PARAMETERS_TYPE ) -> torch.optim.Optimizer: if callable(self.optimizer): return self.optimizer(model_parameters) if self.optimizer in self._OPTIMIZERS: return self._OPTIMIZERS[self.optimizer]( model_parameters, lr=self.learning_rate ) raise ValueError( f"Invalid optimizer given: '{self.optimizer}'. Value values are {list(self._OPTIMIZERS.keys())} or a callable." ) def _train(self, data_loader: torch.utils.data.DataLoader) -> None: # Set in train mode self.neural_network_.train(True) # Initialize variables to keep track of the state best_epoch_loss = torch.inf best_state_dict = None # Iterate over the epochs for epoch in range(self.n_epochs): # Iterate over the batches epoch_loss = 0 for batch in data_loader: epoch_loss += self._train_batch(batch) # Update the best model so far if epoch_loss <= best_epoch_loss: best_epoch_loss = epoch_loss best_state_dict = self.neural_network_.state_dict() # Load the best model again self.neural_network_.load_state_dict(best_state_dict) def _evaluate(self, data_loader: torch.utils.data.DataLoader) -> np.array: # Set in evaluate mode self.neural_network_.eval() # Initialize array for the decision scores decision_scores = np.empty(len(data_loader.dataset)) # Turn off the gradients with torch.no_grad(): # Compute the decision score for each batch idx = 0 for batch in data_loader: batch_scores = self._evaluate_batch(batch).cpu().numpy() decision_scores[idx : idx + batch_scores.shape[0]] = batch_scores idx += batch_scores.shape[0] # Return the computed decision score return decision_scores def _fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> None: # Set the seed self._set_seed() # Compute the window size self.window_size_ = compute_window_size(X, self.window_size, **kwargs) # Build the neural network data_loader = self._build_data_loader(self._build_dataset(X)) self.neural_network_ = self._build_architecture( n_attributes=utils.get_dimension(X) ).to(self.device) self.optimizer_ = self._build_optimizer( model_parameters=self.neural_network_.parameters() ) # Compile the model if self.compile_model: self.neural_network_.compile(mode=self.compile_mode) # Train the network self._train(data_loader) def _decision_function(self, X: np.ndarray) -> np.array: # Build the neural network data_loader = self._build_data_loader(self._build_dataset(X), shuffle=False) # Evaluate the model decision_scores = self._evaluate(data_loader) # Format the decision scores decision_scores = reverse_sliding_window( decision_scores, self.window_size_, self.stride, X.shape[0] ) # Return the decision scores return decision_scores