Source code for dtaianomaly.anomaly_detection._ConvolutionalNeuralNetwork

import numpy as np
import torch

from dtaianomaly.anomaly_detection._BaseDetector import Supervision
from dtaianomaly.anomaly_detection._BaseNeuralDetector import (
    ACTIVATION_FUNCTION_TYPE,
    ACTIVATION_FUNCTIONS,
    COMPILE_MODE_TYPE,
    LOSS_TYPE,
    OPTIMIZER_TYPE,
)
from dtaianomaly.anomaly_detection._BaseNeuralForecastingDetector import (
    ERROR_METRIC_TYPE,
    BaseNeuralForecastingDetector,
)
from dtaianomaly.type_validation import (
    BoolAttribute,
    IntegerAttribute,
    ListAttribute,
    LiteralAttribute,
)
from dtaianomaly.windowing import WINDOW_SIZE_TYPE

__all__ = ["ConvolutionalNeuralNetwork"]


[docs] class ConvolutionalNeuralNetwork(BaseNeuralForecastingDetector): """ Use a convolutional neural network to detect anomalies. The Convolutional Neural Network (CNN) is a neural network consisting of convolutional layers, each consisting of multiple kernels. Given some input, the convolutional layer computes the convolution of the input with each kernel to create an output. The input sequences are fed through multiple such convolutional layers, and the task is to forecast the time series. Hence, by computing the difference between the forecasted value and the actually observed values, the neural network can detect anomalies. The architecture of the CNN consists of blocks, in which each block applies the following operations: convolutional layer :math:`\\rightarrow` batch normalization :math:`\\rightarrow` activation function :math:`\\rightarrow` average pooling. The first and final layers of the network has no batch normalization. Parameters ---------- window_size : int or str The window size to use for extracting sliding windows from the time series. This value will be passed to :py:meth:`~dtaianomaly.anomaly_detection.compute_window_size`. error_metric : {"mean-absolute-error", "mean-squared-error"}, default="mean-absolute-error" The error measure between the reconstructed window and the original window. forecast_length : int default=1 The number of time steps the neural network must forecast. kernel_size : int, default=3 The size of the kernels in the convolutional layers. hidden_layers : list of ints, default=[64, 32] The number of kernels in each hidden layer. Must contain at least 1 value. activation_function : {"linear", "relu", "sigmoid", "tanh"} default="relu" The activation function to use at the end of each convolutional layer. batch_normalization : bool = True, Whether to add batch normalization after each convolutional layer or not. stride : int, default=1 The stride, i.e., the step size for extracting sliding windows from the time series. standard_scaling : bool, default=True Whether to standard scale each window independently, before feeding it to the network. batch_size : int, default=32 The size of the batches to feed to the network. data_loader_kwargs : dictionary, default=None Additional kwargs to be passed to the data loader. For more information, see: https://docs.pytorch.org/docs/stable/data.html. optimizer : {"adam", "sgd"} or callable default="adam" The optimizer to use for learning the weights. If "adam" is given, then the torch.optim.Adam optimizer will be used. If "sgd" is given, then the torch.optim.SGD optimizer will be used. Otherwise, a callable should be given, which takes as input the network parameters, and then creates an optimizer. learning_rate : float, default=1e-3 The learning rate to use for training the network. Has no effect if optimize is a callable. compile_model : bool, default=False Whether the network architecture should be compiled or not before training the weights. For more information, see: https://docs.pytorch.org/docs/stable/generated/torch.compile.html. compile_mode : {"default", "reduce-overhead", "max-autotune", "max-autotune-no-cudagraphs"}, default="default" Method to compile the architecture. For more information, see: https://docs.pytorch.org/docs/stable/generated/torch.compile.html. n_epochs : int, default=10 The number of epochs for which the neural network should be trained. loss_function : {"mse", "l1", "huber} or torch.nn.Module, default="mse" The loss function to use for updating the weights. Valid options are: - ``'mse'``: Use the Mean Squared Error loss. - ``'l1'``: Use the L1-loss or the mean absolute error. - ``'huber'``: Use the huber loss, which smoothly combines the MSE-loss with the L1-loss. - ``torch.nn.Module``: a custom torch module to use for the loss function. device : str, default="cpu" The device on which te neural network should be trained. For more information, see: https://docs.pytorch.org/docs/stable/tensor_attributes.html#torch-device. seed : int, default=None The seed used for training the model. This seed will update the torch and numpy seed at the beginning of the fit method. Attributes ---------- window_size_ : int The effectively used window size for this anomaly detector. optimizer_ : torch.optim.Optimizer The optimizer used for learning the weights of the network. neural_network_ : torch.nn.Module The PyTorch network architecture. See Also -------- BaseNeuralForecastingDetector: Use a neural network to forecast the time series, and detect anomalies by measuring the difference with the actual observations. Examples -------- >>> from dtaianomaly.anomaly_detection import ConvolutionalNeuralNetwork >>> from dtaianomaly.data import demonstration_time_series >>> x, y = demonstration_time_series() >>> cnn = ConvolutionalNeuralNetwork(10, seed=0).fit(x) >>> cnn.decision_function(x) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE, +SKIP array([0.07708263, 0.07708263, 0.06242053, ..., 0.1827196 , 0.2396274 , 0.06390759]...) """ kernel_size: int hidden_layers: list[int] activation_function: ACTIVATION_FUNCTION_TYPE batch_normalization: bool attribute_validation = { "kernel_size": IntegerAttribute(minimum=1), "hidden_layers": ListAttribute(IntegerAttribute(minimum=1), minimum_length=1), "activation_function": LiteralAttribute(ACTIVATION_FUNCTIONS), "batch_normalization": BoolAttribute(), } def __init__( self, window_size: WINDOW_SIZE_TYPE, error_metric: ERROR_METRIC_TYPE = "mean-absolute-error", forecast_length: int = 1, kernel_size: int = 3, hidden_layers: list[int] = (64, 32), activation_function: ACTIVATION_FUNCTION_TYPE = "relu", batch_normalization: bool = True, stride: int = 1, standard_scaling: bool = True, batch_size: int = 32, data_loader_kwargs: dict[str, any] = None, optimizer: OPTIMIZER_TYPE = "adam", learning_rate: float = 1e-3, compile_model: bool = False, compile_mode: COMPILE_MODE_TYPE = "default", n_epochs: int = 10, loss_function: LOSS_TYPE = "mse", device: str = "cpu", seed: int = None, ): super().__init__( window_size=window_size, supervision=Supervision.SEMI_SUPERVISED, error_metric=error_metric, forecast_length=forecast_length, stride=stride, standard_scaling=standard_scaling, batch_size=batch_size, data_loader_kwargs=data_loader_kwargs, optimizer=optimizer, learning_rate=learning_rate, compile_model=compile_model, compile_mode=compile_mode, n_epochs=n_epochs, loss_function=loss_function, device=device, seed=seed, ) self.kernel_size = kernel_size self.hidden_layers = list(hidden_layers) self.activation_function = activation_function self.batch_normalization = batch_normalization def _build_architecture(self, n_attributes: int) -> torch.nn.Module: # Initialize the CNN cnn = torch.nn.Sequential() padding = int((self.kernel_size - 1) / 2) # Initialize layer inputs and outputs inputs = [n_attributes, *self.hidden_layers] outputs = [*self.hidden_layers, n_attributes * self.forecast_length] for i in range(len(inputs) - 1): # Add the convolutional layer cnn.add_module( f"conv-{i}", torch.nn.Conv1d( in_channels=inputs[i], out_channels=outputs[i], kernel_size=self.kernel_size, stride=1, padding=padding, ), ) # Add batch normalization if self.batch_normalization and 0 < i: cnn.add_module(f"batch-norm-{i}", torch.nn.BatchNorm1d(outputs[i])) # Add the activation function cnn.add_module( f"activation-{i}", self._build_activation_function(self.activation_function), ) # Add a pooling layer cnn.add_module(f"pool-{i}", torch.nn.AvgPool1d(kernel_size=2)) # Add a linear layer cnn.add_module("flatten", torch.nn.Flatten()) channel_correction_term = int( np.floor(self.window_size_ / 2 ** len(self.hidden_layers)) ) cnn.add_module( "linear", torch.nn.Linear( in_features=inputs[-1] * channel_correction_term, out_features=outputs[-1], ), ) # Return the CNN return _CNN(n_attributes, cnn)
class _CNN(torch.nn.Module): n_attributes: int cnn: torch.nn.Sequential def __init__(self, n_attributes: int, cnn: torch.nn.Sequential): super().__init__() self.n_attributes = n_attributes self.cnn = cnn def forward(self, x): x = x.view(x.shape[0], self.n_attributes, -1) x = self.cnn(x) return x