Source code for dtaianomaly.anomaly_detection._Chronos

import os
import tempfile
from typing import Literal

import numpy as np
import pandas as pd

from dtaianomaly import utils
from dtaianomaly.anomaly_detection._BaseDetector import BaseDetector, Supervision
from dtaianomaly.type_validation import (
    BoolAttribute,
    IntegerAttribute,
    LiteralAttribute,
    WindowSizeAttribute,
)
from dtaianomaly.windowing import WINDOW_SIZE_TYPE, compute_window_size

__all__ = ["Chronos", "MODEL_PATHS"]


MODEL_PATH_TYPE = Literal[
    "tiny",
    "mini",
    "small",
    "base",
    "large",
    "bolt_tiny, bolt_mini",
    "bolt_small",
    "bolt_large",
]
MODEL_PATHS = [
    "tiny",
    "mini",
    "small",
    "base",
    "large",
    "bolt_tiny",
    "bolt_mini",
    "bolt_small",
    "bolt_large",
]


[docs] class Chronos(BaseDetector): """ Detect anomalies in time series using Chronos :cite:`ansari2024chronos`. Chronos is pre-trained time series foundation model for time series forecasting. By computing the difference between the forecasted values and the actual observations, Chronos is able to detecat anomalies in time series. Chronos first bins the incoming time series sequences to obtain a finite set of values, after which an encoder-decoder model is used to forecast the sequence. The network is trained with a cross-entropy loss, using a combination of real, synthetic and semi-synthetic data. Parameters ---------- window_size : int or str The window size to use for extracting sliding windows from the time series. This value will be passed to :py:meth:`~dtaianomaly.anomaly_detection.compute_window_size`. model_path : {'tiny', 'mini', 'small', 'base', 'large', 'bolt_tiny', 'bolt_mini', 'bolt_small', 'bolt_large'}, default='bolt_small' The Chronos-model to use for forecasting. batch_size : int, default=16 The number of windows to feed simultaneously to Chronos, within a batch. forecast_horizon : int, default=1 The number of samples to forecast for each window. do_fine_tuning : bool, default=False Whether to fine tune the model during fitting. If False, then the model will perform zero-shot forecasting. fine_tune_kwargs : dict, default=None Additional arguments for finetuning Chronos. Check out https://auto.gluon.ai/dev/tutorials/timeseries/forecasting-model-zoo.html#autogluon.timeseries.models.ChronosModel for more information on the options. device : str, default='cpu' The device to use for running Chronos. Attributes ---------- window_size_ : int The effectively used window size for this anomaly detector chronos_ : autogluon.timeseries.TimeSeriesPredictor The Chronos model used for forecasting the time series Warnings -------- If you want to run Chronos, be sure to install the optional dependency ``chronos``: .. code-block:: bash pip install dtaianomaly[chronos] Notes ----- Chronos only handles univariate time series. Examples -------- >>> from dtaianomaly.anomaly_detection import Chronos # doctest: +SKIP >>> from dtaianomaly.data import demonstration_time_series # doctest: +SKIP >>> x, y = demonstration_time_series() # doctest: +SKIP >>> chronos = Chronos(10).fit(x) # doctest: +SKIP >>> chronos.decision_function(x) # doctest: +SKIP array([0.00027719, 0.00027719, 0.00027719, ..., 0.00058781, 0.02628242, 0.00010728]...) """ window_size: WINDOW_SIZE_TYPE model_path: MODEL_PATH_TYPE batch_size: int forecast_horizon: int do_fine_tuning: bool fine_tune_kwargs: dict[str, any] | None device: str window_size_: int chronos_: any attribute_validation = { "window_size": WindowSizeAttribute(), "model_path": LiteralAttribute(MODEL_PATHS), "batch_size": IntegerAttribute(minimum=1), "forecast_horizon": IntegerAttribute(minimum=1), "do_fine_tuning": BoolAttribute(), } def __init__( self, window_size: WINDOW_SIZE_TYPE, model_path: MODEL_PATH_TYPE = "bolt_small", batch_size: int = 16, forecast_horizon: int = 1, do_fine_tuning: bool = False, fine_tune_kwargs: dict[str, any] | None = None, device: str = "cpu", ): try: import autogluon.timeseries except ImportError: raise Exception( "Module 'autogluon.timeseries' is not available, make sure you install it before using Chronos!" ) super().__init__(Supervision.UNSUPERVISED) self.window_size = window_size self.model_path = model_path self.batch_size = batch_size self.forecast_horizon = forecast_horizon self.do_fine_tuning = do_fine_tuning self.fine_tune_kwargs = fine_tune_kwargs self.device = device def _fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> None: # Check if the given dataset is univariate if not utils.is_univariate(X): raise ValueError("Input must be univariate!") # Make sure the time series array has only one dimension X = X.squeeze() # Compute the window size self.window_size_ = compute_window_size(X, self.window_size, **kwargs) # Initialize the hyperparameters hyperparameters = { "model_path": self.model_path, "device": self.device, "batch_size": self.batch_size, } # Enable fine-tuning if self.do_fine_tuning: hyperparameters["fine_tune"] = True hyperparameters.update(self.fine_tune_kwargs or {}) # Initialize the chronos model from autogluon.timeseries import TimeSeriesPredictor tmp_dir = tempfile.mkdtemp(prefix="chronos") os.rmdir(tmp_dir) # To prevent warning that the dir already exists self.chronos_ = TimeSeriesPredictor( prediction_length=self.forecast_horizon, path=tmp_dir ) self.chronos_.fit( self._format_data(X), hyperparameters={"Chronos": hyperparameters}, skip_model_selection=True, enable_ensemble=False, verbosity=0, ) def _decision_function(self, X: np.ndarray) -> np.array: # Check if the given dataset is univariate if not utils.is_univariate(X): raise ValueError("Input must be univariate!") # Make sure the time series array has only one dimension X = X.squeeze() forecasts = self.make_forecasts(X) decision_scores = np.empty(shape=X.shape[0]) for group, df in forecasts.groupby("item_id"): forecast = df.sort_index()["mean"].values actual = X[ group + self.window_size_ : group + self.window_size_ + self.forecast_horizon ] error = np.mean((forecast - actual) ** 2) decision_scores[group + self.window_size_] = error decision_scores[: self.window_size_] = decision_scores[self.window_size_] if self.forecast_horizon > 1: decision_scores[-self.forecast_horizon + 1 :] = decision_scores[ -self.forecast_horizon ] return decision_scores def make_forecasts(self, X: np.ndarray): return self.chronos_.predict(self._format_data(X)) def _format_data(self, X: np.ndarray): records = [] for i in range(X.shape[0] - self.window_size_ - self.forecast_horizon + 1): input_window = X[i : i + self.window_size_ + self.forecast_horizon] records.extend([(i, j, val) for j, val in enumerate(input_window)]) df = pd.DataFrame(records, columns=["item_id", "timestamp", "target"]) from autogluon.timeseries import TimeSeriesDataFrame return TimeSeriesDataFrame.from_data_frame(df)