Skip to content

Commit 6d9dbba

Browse files
committed
Added whole-series ROCKAD anomaly detector
1 parent da35462 commit 6d9dbba

File tree

1 file changed

+221
-0
lines changed

1 file changed

+221
-0
lines changed
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
"""ROCKAD anomaly detector."""
2+
3+
__all__ = ["ROCKAD"]
4+
5+
import warnings
6+
from typing import Optional
7+
8+
import numpy as np
9+
from sklearn.neighbors import NearestNeighbors
10+
from sklearn.preprocessing import PowerTransformer
11+
from sklearn.utils import resample
12+
13+
from aeon.anomaly_detection.whole_series.base import BaseCollectionAnomalyDetector
14+
from aeon.transformations.collection.convolution_based import Rocket
15+
16+
17+
class ROCKAD(BaseCollectionAnomalyDetector):
18+
"""
19+
ROCKET-based whole-series Anomaly Detector (ROCKAD).
20+
21+
ROCKAD [1]_ leverages the ROCKET transformation for feature extraction from
22+
time series data and applies the scikit learn k-nearest neighbors (k-NN)
23+
approach with bootstrap aggregation for robust semi-supervised anomaly detection.
24+
The data gets transformed into the ROCKET feature space.
25+
Then the whole-series are compared based on the feature space by
26+
finding the nearest neighbours. The time-point based ROCKAD anomaly detector
27+
can be found at aeon/anomaly_detection/_rockad.py
28+
29+
This class supports both univariate and multivariate time series and
30+
provides options for normalizing features, applying power transformations,
31+
and customizing the distance metric.
32+
33+
Parameters
34+
----------
35+
n_estimators : int, default=10
36+
Number of k-NN estimators to use in the bootstrap aggregation.
37+
n_kernels : int, default=100
38+
Number of kernels to use in the ROCKET transformation.
39+
normalise : bool, default=False
40+
Whether to normalize the ROCKET-transformed features.
41+
n_neighbors : int, default=5
42+
Number of neighbors to use for the k-NN algorithm.
43+
n_jobs : int, default=1
44+
Number of parallel jobs to use for the k-NN algorithm and ROCKET transformation.
45+
metric : str, default="euclidean"
46+
Distance metric to use for the k-NN algorithm.
47+
power_transform : bool, default=True
48+
Whether to apply a power transformation (Yeo-Johnson) to the features.
49+
random_state : int, default=42
50+
Random seed for reproducibility.
51+
52+
References
53+
----------
54+
.. [1] Theissler, A., Wengert, M., Gerschner, F. (2023).
55+
ROCKAD: Transferring ROCKET to Whole Time Series Anomaly Detection.
56+
In: Crémilleux, B., Hess, S., Nijssen, S. (eds) Advances in Intelligent
57+
Data Analysis XXI. IDA 2023. Lecture Notes in Computer Science,
58+
vol 13876. Springer, Cham. https://doi.org/10.1007/978-3-031-30047-9_33
59+
60+
Examples
61+
--------
62+
>>> import numpy as np
63+
>>> from aeon.anomaly_detection.whole_series import ROCKAD
64+
>>> rng = np.random.default_rng(seed=42)
65+
>>> X_train = rng.normal(loc=0.0, scale=1.0, size=(10, 100))
66+
>>> X_test = rng.normal(loc=0.0, scale=1.0, size=(5, 100))
67+
>>> X_test[4][50:58] -= 5
68+
>>> detector = ROCKAD()
69+
>>> detector.fit(X_train)
70+
>>> detector.predict(X_test)
71+
array([24.11974147, 23.93866453, 21.3941765 , 22.26811959, 64.9630108 ])
72+
73+
Attributes
74+
----------
75+
rocket_transformer_ : Optional[Rocket]
76+
Instance of the ROCKET transformer used to extract features, set after fitting.
77+
list_baggers_ : Optional[list[NearestNeighbors]]
78+
List containing k-NN estimators used for anomaly scoring, set after fitting.
79+
power_transformer_ : PowerTransformer
80+
Transformer used to apply power transformation to the features.
81+
"""
82+
83+
_tags = {
84+
"capability:univariate": True,
85+
"capability:multivariate": True,
86+
"capability:missing_values": False,
87+
"capability:multithreading": True,
88+
"fit_is_empty": False,
89+
}
90+
91+
def __init__(
92+
self,
93+
n_estimators=10,
94+
n_kernels=100,
95+
normalise=False,
96+
n_neighbors=5,
97+
metric="euclidean",
98+
power_transform=True,
99+
n_jobs=1,
100+
random_state=42,
101+
):
102+
103+
self.n_estimators = n_estimators
104+
self.n_kernels = n_kernels
105+
self.normalise = normalise
106+
self.n_neighbors = n_neighbors
107+
self.n_jobs = n_jobs
108+
self.metric = metric
109+
self.power_transform = power_transform
110+
self.random_state = random_state
111+
112+
self.rocket_transformer_: Optional[Rocket] = None
113+
self.list_baggers_: Optional[list[NearestNeighbors]] = None
114+
self.power_transformer_: Optional[PowerTransformer] = None
115+
116+
super().__init__()
117+
118+
def _fit(self, X: np.ndarray, y: Optional[np.ndarray] = None) -> "ROCKAD":
119+
_X = X
120+
self._inner_fit(_X)
121+
122+
return self
123+
124+
def _inner_fit(self, X: np.ndarray) -> None:
125+
126+
self.rocket_transformer_ = Rocket(
127+
n_kernels=self.n_kernels,
128+
normalise=self.normalise,
129+
n_jobs=self.n_jobs,
130+
random_state=self.random_state,
131+
)
132+
# XT: (n_cases, n_kernels*2)
133+
Xt = self.rocket_transformer_.fit_transform(X)
134+
Xt = Xt.astype(np.float64)
135+
136+
if self.power_transform:
137+
self.power_transformer_ = PowerTransformer()
138+
try:
139+
Xtp = self.power_transformer_.fit_transform(Xt)
140+
141+
except Exception:
142+
warnings.warn(
143+
"Power Transform failed and thus has been disabled. ",
144+
UserWarning,
145+
stacklevel=2,
146+
)
147+
self.power_transformer_ = None
148+
Xtp = Xt
149+
else:
150+
Xtp = Xt
151+
152+
self.list_baggers_ = []
153+
154+
for idx_estimator in range(self.n_estimators):
155+
# Initialize estimator
156+
estimator = NearestNeighbors(
157+
n_neighbors=self.n_neighbors,
158+
n_jobs=self.n_jobs,
159+
metric=self.metric,
160+
algorithm="kd_tree",
161+
)
162+
# Bootstrap Aggregation
163+
Xtp_scaled_sample = resample(
164+
Xtp,
165+
replace=True,
166+
n_samples=None,
167+
random_state=self.random_state + idx_estimator,
168+
stratify=None,
169+
)
170+
171+
# Fit estimator and append to estimator list
172+
estimator.fit(Xtp_scaled_sample)
173+
self.list_baggers_.append(estimator)
174+
175+
def _predict(self, X) -> np.ndarray:
176+
_X = X
177+
collection_anomaly_scores = self._inner_predict(_X)
178+
179+
return collection_anomaly_scores
180+
181+
def _inner_predict(self, X: np.ndarray) -> np.ndarray:
182+
"""
183+
Return the anomaly scores for the input data.
184+
185+
Parameters
186+
----------
187+
X (array-like): The input data.
188+
189+
Returns
190+
-------
191+
np.ndarray: The predicted probabilities.
192+
193+
"""
194+
y_scores = np.zeros((len(X), self.n_estimators))
195+
# Transform into rocket feature space
196+
# XT: (n_cases, n_kernels*2)
197+
Xt = self.rocket_transformer_.transform(X)
198+
199+
Xt = Xt.astype(np.float64)
200+
201+
if self.power_transformer_ is not None:
202+
# Power Transform using yeo-johnson
203+
Xtp = self.power_transformer_.transform(Xt)
204+
205+
else:
206+
Xtp = Xt
207+
208+
for idx, bagger in enumerate(self.list_baggers_):
209+
# Get scores from each estimator
210+
distances, _ = bagger.kneighbors(Xtp)
211+
212+
# Compute mean distance of nearest points in window
213+
scores = distances.mean(axis=1).reshape(-1, 1)
214+
scores = scores.squeeze()
215+
216+
y_scores[:, idx] = scores
217+
218+
# Average the scores to get the final score for each whole-series
219+
collection_anomaly_scores = y_scores.mean(axis=1)
220+
221+
return collection_anomaly_scores

0 commit comments

Comments
 (0)