import random
from typing import Literal
import numpy as np
import torch
from sklearn.neighbors import NearestNeighbors
from dtaianomaly.anomaly_detection._BaseDetector import BaseDetector, Supervision
from dtaianomaly.anomaly_detection._BaseNeuralDetector import (
ACTIVATION_FUNCTION_TYPE,
ACTIVATION_FUNCTIONS,
BaseNeuralDetector,
)
from dtaianomaly.type_validation import (
FloatAttribute,
IntegerAttribute,
ListAttribute,
LiteralAttribute,
NoneAttribute,
WindowSizeAttribute,
)
from dtaianomaly.windowing import (
WINDOW_SIZE_TYPE,
compute_window_size,
reverse_sliding_window,
sliding_window,
)
__all__ = ["HybridKNearestNeighbors"]
[docs]
class HybridKNearestNeighbors(BaseDetector):
"""
Anomaly detection based on a hybrid K-NN with AutoEncoder embedding :cite:`song2017hybrid`.
Combine an autoencoder model to learn a latent space representation of the subsequences
with an ensemble of K-NN instances. At training, an autoencoder is fitted using subsequences
from the training time series to embed them into a latent space. Then, the latent space
embeddings are split into multiple subsets in a bagging-like manner. For each subset, a
K-NN instance is initialized, and the average K-th nearest neighbor distance of each sample
is computed across all the subsets. At prediction time, the autoencoder creates the latent
space embedding of the sequences, and the average K-th nearest neighbor distance of each
test-sequence across all subsets is computed. The anomaly score of a test-sequence in regard
to a subset is then computed as the proportion of samples in the subset that have
a smaller average distance. The final anomaly score equals the average anomaly score
across all subsets.
Parameters
----------
window_size : int or str
The window size to use for extracting sliding windows from the time series. This
value will be passed to :py:meth:`~dtaianomaly.anomaly_detection.compute_window_size`.
stride : int, default=1
The stride, i.e., the step size for extracting sliding windows from the time series.
n_neighbors : int, default=5
The number of neighbors to use for the nearest neighbor queries.
n_estimators : int, default=100
The number of K-NN instance and consequently subsets to use.
max_samples : int or float, default='auto'
The number of samples to draw for each subset:
- if ``int``: Draw at most ``max_samples`` samples.
- if ``float``: Draw at most ``max_samples`` percentage of the samples.
- if ``'auto'``: Set ``max_samples=n_windows/n_estimators``.
metric : str, default='euclidean'
Distance metric for distance computations. Any metric of scikit-learn and
scipy.spatial.distance can be used.
hidden_layer_dimensions : list of ints, default=[64]
The number of neurons in each hidden layer of the encoder and decoder. The given list
equals the ordered sequence of neurons in the encoder. The layers in the decoder has
the same dimensions but mirrored.
latent_space_dimension : int default=16
The dimension of the latent space.
activation_function : {"linear", "relu", "sigmoid", "tanh"} default="relu"
The activation function to use at the end of each layer.
batch_size : int, default=32
The size of the batches to feed to the network.
n_epochs : int, default=5
The number of epochs for which the neural network should be trained.
learning_rate : float, default=1e-3
The learning rate to use for training the network.
device : str, default="cpu"
The device on which te neural network should be trained.
For more information, see: https://docs.pytorch.org/docs/stable/tensor_attributes.html#torch-device.
seed : int, default=None
The seed used for training the autoencoder and sampling the subsets.
Attributes
----------
window_size_ : int
The effectively used window size for this anomaly detector
nearest_neighbors_ : list[NearestNeighbors]
The scikit-learn nearest neighbor instances for each subset
g_ : list[np.ndarray]
For each subset, a vector containing the average distance of each
sample within the subset to its K-th nearest neighbor across all
other subsets.
auto_encoder_ : torch.nn.Module
The auto encoder used to embed the windows in the time series.
Notes
-----
- :cite:t:`song2017hybrid` assigns a binary anomaly score for a test-sequence
with regards to a subset by checking if the number of sequences within the
subset that have a greater distance than the test-sequence exceeds some
predefined threshold :math:`\\alpha` (Equation 10). We drop this part to
reduce the number of parameters and to allow for a more fine-grained anomaly
score computation.
- Currently, a very simple feed-forward auto encoder is implemented. If you want
to use a more advanced model, you can extend this class and overwrite the
:py:meth:`~dtaianomaly.anomaly_detection.HybridKNearestNeighbors.build_auto_encoder`
which returns a torch.nn.Module with ``fit(windows)`` and a ``encode(windows)``
methods. The given windows are those computed by :py:meth:`~dtaianomaly.windowing.sliding_window`.
Examples
--------
>>> from dtaianomaly.anomaly_detection import HybridKNearestNeighbors
>>> from dtaianomaly.data import demonstration_time_series
>>> x, y = demonstration_time_series()
>>> hybrid_knn = HybridKNearestNeighbors(64, seed=0).fit(x)
>>> hybrid_knn.decision_function(x) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
array([0.12284644, 0.38202247, 0.43220974, ..., 0.83470662, 0.81722846,
0.85243446]...)
"""
window_size: WINDOW_SIZE_TYPE
stride: int
n_neighbors: int
n_estimators: int
max_samples: float | int | Literal["auto"]
metric: str
hidden_layer_dimensions: list[int]
latent_space_dimension: int
activation_function: ACTIVATION_FUNCTION_TYPE
n_epochs: int
learning_rate: float
device: str
seed: int | None
window_size_: int
nearest_neighbors_: list[NearestNeighbors]
g_: list[np.ndarray]
auto_encoder_: torch.nn.Module
attribute_validation = {
"window_size": WindowSizeAttribute(),
"stride": IntegerAttribute(1),
"n_neighbors": IntegerAttribute(minimum=1),
"n_estimators": IntegerAttribute(minimum=1),
"max_samples": IntegerAttribute(minimum=1)
| FloatAttribute(0.0, 1.0, inclusive_minimum=False)
| LiteralAttribute("auto"),
"hidden_layer_dimensions": ListAttribute(IntegerAttribute(minimum=1)),
"latent_space_dimension": IntegerAttribute(minimum=1),
"activation_function": LiteralAttribute(ACTIVATION_FUNCTIONS),
"batch_size": IntegerAttribute(minimum=1),
"n_epochs": IntegerAttribute(minimum=1),
"learning_rate": FloatAttribute(minimum=0.0, inclusive_minimum=False),
"seed": IntegerAttribute() | NoneAttribute(),
}
def __init__(
self,
window_size: WINDOW_SIZE_TYPE,
stride: int = 1,
n_neighbors: int = 5,
n_estimators: int = 3,
max_samples: float | int = "auto",
metric: str = "euclidean",
hidden_layer_dimensions: list[int] = (64,),
latent_space_dimension: int = 16,
activation_function: ACTIVATION_FUNCTION_TYPE = "relu",
batch_size: int = 32,
n_epochs: int = 5,
learning_rate: float = 1e-3,
device: str = "cpu",
seed: int = None,
):
super().__init__(Supervision.SEMI_SUPERVISED)
self.window_size = window_size
self.stride = stride
self.n_neighbors = n_neighbors
self.n_estimators = n_estimators
self.max_samples = max_samples
self.metric = metric
self.hidden_layer_dimensions = list(hidden_layer_dimensions)
self.latent_space_dimension = latent_space_dimension
self.activation_function = activation_function
self.batch_size = batch_size
self.n_epochs = n_epochs
self.learning_rate = learning_rate
self.device = device
self.seed = seed
def _fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> None:
# Compute the window size
self.window_size_ = compute_window_size(X, self.window_size, **kwargs)
windows = sliding_window(X, self.window_size_, self.stride)
self.auto_encoder_ = self.build_auto_encoder(windows)
self.auto_encoder_.fit(windows)
X_ = self.auto_encoder_.encode(windows)
# Create the subsets
subsets = self._create_subsets(X_)
# Fit the nearest neighbor instances
self.nearest_neighbors_ = [
NearestNeighbors(n_neighbors=self.n_neighbors, metric=self.metric).fit(
subset
)
for subset in subsets
]
# Compute the g-array
self.g_ = [self._compute_g(subset).reshape(1, -1) for subset in subsets]
def _decision_function(self, X: np.ndarray) -> np.array:
# Compute the windows
windows = sliding_window(X, self.window_size_, self.stride)
X_ = self.auto_encoder_.encode(windows)
# Compute the G-values
g = self._compute_g(X_).reshape(-1, 1)
# Compute the p-values (Equation 9)
p = np.array(
[
# The paper states less_equal, but then detects anomalies based on the smallest values.
# We compute greater to have higher scores for more anomalous points.
np.greater(g, self.g_[i]).mean(axis=1)
for i in range(self.n_estimators)
]
)
# Apply reverse sliding window
return reverse_sliding_window(
p.mean(axis=0), self.window_size_, self.stride, X.shape[0]
)
def _create_subsets(self, X: np.ndarray) -> list[np.ndarray]:
rng = np.random.default_rng(self.seed)
if self.max_samples == "auto":
nb_samples = X.shape[0] / self.n_estimators
elif isinstance(self.max_samples, float):
nb_samples = self.max_samples * X.shape[0]
else:
nb_samples = self.max_samples
return [
X[rng.choice(X.shape[0], size=int(nb_samples), replace=False)]
for _ in range(self.n_estimators)
]
def _compute_g(self, X: np.ndarray) -> np.array:
# Equation 8
return np.array(
[
nearest_neighbors.kneighbors(X)[0][:, -1]
for nearest_neighbors in self.nearest_neighbors_
]
).mean(axis=0)
[docs]
def build_auto_encoder(self, windows: np.ndarray) -> torch.nn.Module:
"""
Build an auto encoder.
Build an auto encoder module that takes as input the given windows
and learns to reconstruct them.
Parameters
----------
windows : array-like of shape (n_windows, n_attributes x window_size)
The windows that will be fed to the auto encoder.
Returns
-------
torch.nn.Module
Returns a torch neural network module which will take as input
the windows and learns to reconstruct them. The torch module
has a ``.fit(windows)`` method to learn the weights and a
``.encode(windows)`` method to transform the windows into a
latent space embedding.
"""
return _AutoEncoder(
input_size=windows.shape[1],
hidden_layer_dimensions=self.hidden_layer_dimensions,
latent_space_dimension=self.latent_space_dimension,
activation_function=self.activation_function,
batch_size=self.batch_size,
n_epochs=self.n_epochs,
learning_rate=self.learning_rate,
device=self.device,
seed=self.seed,
)
class _AutoEncoder(torch.nn.Module):
encoder: torch.nn.Sequential
decorator: torch.nn.Sequential
batch_size: int
n_epochs: int
learning_rate: float
device: str
seed: int | None
def __init__(
self,
input_size: int,
hidden_layer_dimensions: list[int],
latent_space_dimension: int,
activation_function: ACTIVATION_FUNCTION_TYPE,
batch_size: int,
n_epochs: int,
learning_rate: float,
device: str,
seed: int | None,
):
super().__init__()
self.seed = seed
self._set_seed()
encoder_layers = []
prev_d = input_size
for d in hidden_layer_dimensions:
encoder_layers.append(torch.nn.Linear(prev_d, d))
encoder_layers.append(
BaseNeuralDetector._build_activation_function(activation_function)
)
prev_d = d
encoder_layers.append(
torch.nn.Linear(hidden_layer_dimensions[-1], latent_space_dimension)
)
self.encoder = torch.nn.Sequential(*encoder_layers)
decoder_layers = []
prev_d = latent_space_dimension
for d in reversed(hidden_layer_dimensions):
decoder_layers.append(torch.nn.Linear(prev_d, d))
decoder_layers.append(
BaseNeuralDetector._build_activation_function(activation_function)
)
prev_d = d
decoder_layers.append(torch.nn.Linear(hidden_layer_dimensions[0], input_size))
self.decoder = torch.nn.Sequential(*decoder_layers)
self.batch_size = batch_size
self.n_epochs = n_epochs
self.learning_rate = learning_rate
self.device = device
self.to(device)
def _set_seed(self):
if self.seed is not None:
random.seed(self.seed)
np.random.seed(self.seed)
torch.manual_seed(self.seed)
torch.cuda.manual_seed_all(self.seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
def encode(self, windows: np.ndarray) -> np.ndarray:
with torch.no_grad():
x = torch.Tensor(windows).to(self.device)
encoded = self.encoder(x)
return encoded.cpu().numpy()
def fit(self, windows: np.ndarray):
self._set_seed()
optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
criterion = torch.nn.MSELoss()
dataset = torch.utils.data.TensorDataset(torch.Tensor(windows))
dataloader = torch.utils.data.DataLoader(dataset, batch_size=self.batch_size)
self.train()
for epoch in range(self.n_epochs):
for batch in dataloader:
batch = batch[0].to(self.device)
self.zero_grad()
loss = criterion(self.forward(batch), batch)
loss.backward()
optimizer.step()
self.eval()