-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathocsvm.py
More file actions
59 lines (47 loc) · 2.45 KB
/
ocsvm.py
File metadata and controls
59 lines (47 loc) · 2.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
"""An implementation of the One-Class SVM shallow model."""
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.svm import OneClassSVM
from .interface.algorithm_config import *
from .interface.algorithm_information import *
from .interface.algorithm_interface import *
from .util.shallow_model_util import remove_seasonality, post_processing
class Algorithm(AlgorithmInterface):
"""Algorithm class that contains the One-Class SVM implementation."""
def __init__(self) -> None:
self.model = OneClassSVM(kernel="rbf", gamma=0.01, nu=0.01)
self.scaler = StandardScaler()
self.info = AlgorithmInformation(name="One-Class SVM", deep=False, explainable=False)
self.config = AlgorithmConfig()
@property
def information(self) -> AlgorithmInformation:
"""Returns an AlgorithmInformation object containing all information regarding the algorithm.
Returns:
An AlgorithmInformation object containing all information regarding the algorithm.
"""
return self.info
@property
def configuration(self) -> AlgorithmConfig:
"""Returns an AlgorithmConfig object containing all information regarding the config options for the algorithm.
Returns:
An AlgorithmConfig object containing all information regarding the config options.
"""
return self.config
def calc_anomaly_score(self, data: pd.DataFrame, building: str, config: dict) -> tuple[list, list, list, float]:
"""Calculates an anomaly score for the given data.
Removes the seasonality and scales the given data.
Fits the model and calculates anomaly scores on the adjusted model.
Calculates the new indices and passes the resulting data to post-processing.
Args:
data: A dataframe containing a sensor data slice.
building: The name of the building from which the data originates
config: The user specified configuration data.
Returns:
An empty deep anomaly score, the calculated anomaly scores, the timestamps and the threshold.
"""
processed_data = data.apply(lambda x: remove_seasonality(x), axis=0).dropna()
scaled_data = self.scaler.fit_transform(processed_data)
self.model.fit(scaled_data)
scores = self.model.decision_function(scaled_data)
indices = data.iloc[336:-336].index.tolist()
return post_processing(scores, indices)