analyser.py 19.7 KB
Newer Older
1
2
3

import numpy as np

4
5
from abc import ABC
from abc import abstractmethod
6

7
8
import json
import scipy
9
import pandas as pd
10

11
from adcern.publisher import generate_result
12
from etl.spark_etl.utils import read_json_config
13
from adcern.publisher import create_id_for_hostrgroups_and_plugins_start_end
Antonin Dvorak's avatar
Antonin Dvorak committed
14
from adcern.sqlite3_backend import modify_db
Matteo Paltenghi's avatar
Matteo Paltenghi committed
15

Matteo Paltenghi's avatar
Matteo Paltenghi committed
16
from sklearn.preprocessing import RobustScaler
17
import sqlite3
Matteo Paltenghi's avatar
Matteo Paltenghi committed
18

19
20
# NB: Higher Score = Highly probable it is an outlier

Matteo Paltenghi's avatar
Matteo Paltenghi committed
21

22
class BaseOutlierAnalyser(ABC):
23

24
    @abstractmethod
25
    def __init__(self):
Matteo Paltenghi's avatar
Matteo Paltenghi committed
26
27
28
29
30
31
32
33
34
35
36
37
        """Abstract class for all outlier detection algorithms.

        Attributes
        ----------
        scores : numpy array of shape (n_samples,)
            The outlier scores of the training data.
            The higher, the more abnormal. Outliers tend to have higher
            scores. This value is available once the detector is fitted.
        host_names : numpy array of shape (n_samples,)
            Each entry is a string. It conatins the host names of the machine
            used as training data.
        """
38
        pass
39

40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
    @abstractmethod
    def fit(self, X, prov, y=None, value_for_missing=0):
        """Fit the model using X as training data.

        Parameters
        ----------
        X : array-like shape (n_samples, n_features)
            Training data.
        prov : containing info about the provenance of each sample in the X
               train data in terms of machine name and time of the recorder
               data.
        y : Ignored
            not used, present for API consistency by convention.
        value_for_missing : int
                            number to put in place of missing values
        """
        pass

    @abstractmethod
    def predict(self, X, prov, y=None, value_for_missing=0):
        """Predict the scores for the X testing data.

        Parameters
        ----------
        X : array-like shape (n_samples, n_features)
            Training data.
        prov : containing info about the provenance of each sample in the X
               train data in terms of machine name and time of the recorder
               data.
        y : Ignored
            not used, present for API consistency by convention.
        value_for_missing : int
                            number to put in place of missing values
        Returns
        -------
        scores : list of scores. Higher = More critical
        """
        pass

79
80
81
82
83
84
85
86
    def get_scores(self, on_machines=None):
        """Get the score of the machines analyzed."""
        return self.scores

    def get_machines(self):
        """Get the names of the machines analyzed."""
        return self.host_names

87
    def get_most_critical(self, on_machines=None, percentile=90,
88
89
90
91
92
93
94
95
96
97
                          threshold=None, max_nr_machines=None):
        """Get only a subset of machines.

        You can choose the most critical based on the some criteria.
        Parameters
        ----------
        on_machines : list of str, optional
            Machine on which you want to restrict the function. You will run
            the criteria only on the scores of this subset
        percentile : int [from 0 to 100], optional
98
            DEFAULT: 90.
99
100
            The percentile that you want to get (e.g you can select the top 5%
            highest scores and machines.
101
            NB if threshold is set, this threshold won't take effect.
102
103
        threshold : int, optional
            You will get all the machines and host with a score higher than
104
            the threshold.
105
106
107
108
109
110
111
112
113
114
115
        max_nr_machines : int, optional
            Defines how many machine you want to get (e.g. you can put 10 and
            get the top 10 machines with the highest score). This applies also
            when percentile or threshold are set to further restrict the
            result.
        Returns
        -------
        candidate_hosts : list of str (names of the most critical machines)
        candidate_scores : list of int (scores of the most critical machines)

        """
116
        self._compute_ranking()
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
        candidate_scores = self.ordered_scores
        candidate_hosts = self.ordered_host_names
        # consider only the machine that we are interested in
        if on_machines is not None:
            candidate_hosts = [x for x in candidate_hosts
                               if x in on_machines]
            candidate_scores = [candidate_scores[i]
                                for i, x in enumerate(candidate_hosts)
                                if x in on_machines]
        # cut with the threshold
        if threshold is not None:
            candidate_scores = [x for x in candidate_scores if x > threshold]
            candidate_hosts = [candidate_hosts[i]
                               for i, x in enumerate(candidate_scores)
                               if x > threshold]
132
133
        else:
            # get the percentile threshold
134
            threshold = np.percentile(candidate_scores, percentile)
135
136
137
138
139
140
        # limit the number of results
        if max_nr_machines is not None and \
                len(candidate_hosts) > max_nr_machines:
            candidate_scores = candidate_scores[:max_nr_machines]
            candidate_hosts = candidate_hosts[:max_nr_machines]
        return candidate_hosts, candidate_scores
141

142
143
144
145
146
    def _compute_ranking(self):
        """Compute the ranking of the scores."""
        ordered_index = np.argsort(self.scores)[::-1]
        self.ordered_scores = [self.scores[i] for i in ordered_index]
        self.ordered_host_names = [self.host_names[i] for i in ordered_index]
Matteo Paltenghi's avatar
Matteo Paltenghi committed
147
148
        self.ranks = [sorted(self.scores, reverse=True).index(x) + 1 for x in self.scores]  # noqa
        self.percentiles = [scipy.stats.percentileofscore(self.scores, a, 'weak')  # noqa
149
150
                            for a in self.scores]

Matteo Paltenghi's avatar
Matteo Paltenghi committed
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
    def prepare_to_publish(self,
                           dataset,
                           normalization_name,
                           normalization_id,
                           slide_steps,
                           history_len):
        """Give all the info necesssary to publish.

        Parameters
        ----------
        dataset: CollectdDataset object
            dataset in which we are spotting anomalies
        cell_name, hostgroup, plugins, granularity_min,
        class_name, class_version,
        normalization,
        slide_steps, history_len,

        - TO BE DONE IN THE CHILD CLASS
        hyperparameters_str,
        hyperparameters_int,
        hyperparameters_float,
        hyperparameters_bool,
        """
        meta = dataset.get_metadata()
        self.cell_name = meta["cell_name"]
        self.hostgroup = "unsupported"
        self.granularity_min = meta["granularity_min"]
        self.plugins = meta["plugins"]
        self.normalization_name = normalization_name
        self.normalization_id = normalization_id
        self.slide_steps = slide_steps
        self.history_len = history_len
        self.version = -1.0
        self.classname = self.__class__.__name__

186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
    def prepare_to_publish_config(self,
                                  config_file,
                                  slide_steps):
        """Give all the info necesssary to publish.

        Parameters
        ----------
        config_file: str
            path to json file with settings
        """
        config_dictionary = read_json_config(config_file)
        hostgroup = config_dictionary["hostgroups"][0]
        self.cell_name = hostgroup[hostgroup.rfind("/") + 1:]
        self.hostgroup = hostgroup
        self.granularity_min = config_dictionary["aggregate_every_n_minutes"]
        self.plugins = config_dictionary["selected_plugins"]
        self.normalization_name = "standardization"
        self.normalization_id = \
            create_id_for_hostrgroups_and_plugins_start_end(
                hostgroups=config_dictionary["hostgroups"],
                plugins=config_dictionary["selected_plugins"],
                start=config_dictionary["date_start_normalization"],
                end=config_dictionary["date_end_normalization_excluded"])
        self.slide_steps = slide_steps
        self.history_len = config_dictionary["history_steps"]
        self.version = -1.0
        self.classname = self.__class__.__name__

Matteo Paltenghi's avatar
Matteo Paltenghi committed
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
    def publish_anomaly(self,
                        entity,
                        score,
                        rank,
                        percentile,
                        ts_second_window_start, ts_second_window_end,
                        nr_anomalous_metrics=-1, nr_total_metrics=-1,
                        ordered_anomalous_metrics=["unsupported"],
                        validity=False):
        """Publish one single anomaly of one entity.

        Parameters
        ----------
        entity, score, rank, percentile,
        ts_second_window_start, ts_second_window_end,
        nr_anomalous_metrics, nr_total_metrics,
        ordered_anomalous_metrics,
        validity=False
        """
233
234
235
236
        self.classname = self.__class__.__name__
        if self.classname == "PyODWrapperAnalyzer":
            self.classname += "_" + self.pyod_detector.__class__.__name__

Matteo Paltenghi's avatar
Matteo Paltenghi committed
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
        id_monit, package_for_monit = generate_result(
            class_name=self.classname,
            class_version=self.version,
            hyperparameters_str={},
            hyperparameters_int={},
            hyperparameters_float={},
            hyperparameters_bool={},
            cell_name=self.cell_name,
            hostgroup=self.hostgroup,
            plugins=self.plugins,
            granularity_min=self.granularity_min,
            normalization=self.normalization_name,
            entity=entity,
            score=score,
            percentile=percentile,
            rank=rank,
            ts_second_window_start=ts_second_window_start,
            ts_second_window_end=ts_second_window_end,
            slide_steps=self.slide_steps,
            history_len=self.history_len,
            nr_anomalous_metrics=nr_anomalous_metrics,
            nr_total_metrics=nr_total_metrics,
            ordered_anomalous_metrics=ordered_anomalous_metrics,
            validity=validity)

        print(json.dumps(package_for_monit))

    def publish_top_k(self,
265
                      ts_second_window_end,
Matteo Paltenghi's avatar
Matteo Paltenghi committed
266
                      top_k=3,
267
268
269
270
271
272
273
274
275
                      standardized_values=True,
                      validity=False,
                      ts_second_window_start=None):
        """Publish top k anmalies.

        If ts_second_window_start is not specified it will be computed from the
        end and the info about the granularity in minutes and the length on the
        history.
        """
Matteo Paltenghi's avatar
Matteo Paltenghi committed
276
        self._compute_ranking()
277
278
279
280
281
282
283
        if standardized_values:
            scores_selected = self.std_scores
        else:
            scores_selected = self.scores
        if ts_second_window_start is None:
            ts_second_window_start = \
                ts_second_window_end - (self.history_len * self.granularity_min * 60)  # noqa
284
        counter = 0
Matteo Paltenghi's avatar
Matteo Paltenghi committed
285
        for (entity, score, percentile, rank) in \
286
                zip(self.host_names, scores_selected, self.percentiles, self.ranks):  # noqa
287
288
            if (rank <= top_k) and (counter < top_k):
                counter += 1
Matteo Paltenghi's avatar
Matteo Paltenghi committed
289
290
291
292
293
294
295
296
                entity_without_cern_ch = entity.split(".")[0]
                self.publish_anomaly(entity_without_cern_ch,
                                     score,
                                     rank,
                                     percentile,
                                     ts_second_window_start,
                                     ts_second_window_end,
                                     validity=validity)
297

298
299
300
301
302
303
304
    def save_csv_scores(
            self,
            score_folder,
            config_file,
            scores,
            entities,
            end_window):
305
306
307
308
309
310
311
312
        """Save a csv with scores."""
        config_dictionary = read_json_config(config_file)
        base_chunk_name = config_dictionary["code_project_name"]
        self.classname = self.__class__.__name__
        if self.classname == "PyODWrapperAnalyzer":
            self.classname += "_" + self.pyod_detector.__class__.__name__

        csv_name = self.classname + "_" + \
Matteo Paltenghi's avatar
Matteo Paltenghi committed
313
314
315
316
            base_chunk_name + "_" + str(int(end_window)) + ".csv"
        df = pd.DataFrame({"hostname": entities,
                           "score": scores,
                           "timestamp": str(int(end_window))})
317
        df.to_csv(score_folder + "/" + csv_name, index=False)
318

319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
    def save_scores_local_sqlite(
            self,
            score_folder,
            config_file,
            scores,
            entities,
            end_window):
        """Save scores in a Sqlite3 database.

        Every tuple (score, entity, end_window) becomes a new record of the
        table scores. It also add columns for:
        - algorithm: name of the class representiong this analyser
        - hostgroup: name of the hostgroup
        - noramlization_id: condensing hostgroup, plugins, normalization start
            normalization end.

        Params
        ------
        score_folder: str
            folder where the database is (if not present it is created)
        config_file: str
            path to the JSON file contining the configuration of the test
        scores: list(float)
            scores of the server, each float is the score of one server
        entities: list(str)
            names of the servers, each string is the name of one server
        end_window: int
            the timestamp of the end of the window. it is broadcasted to all
            the records
        """
        # get general info for every line
        config_dictionary = read_json_config(config_file)
        hostgroup = config_dictionary["hostgroups"][0]
        algorithm_name = self.__class__.__name__
        if algorithm_name == "PyODWrapperAnalyzer":
            algorithm_name = self.pyod_detector.__class__.__name__
        normalization_id = \
            create_id_for_hostrgroups_and_plugins_start_end(
                hostgroups=config_dictionary["hostgroups"],
                plugins=config_dictionary["selected_plugins"],
                start=config_dictionary["date_start_normalization"],
                end=config_dictionary["date_end_normalization_excluded"])

        # preapre result to inject
        df = pd.DataFrame({"hostgroup": str(hostgroup),
                           "hostname": entities,
                           "algorithm": str(algorithm_name),
                           "score": scores,
                           "end_window": str(int(end_window)),
                           "noramlization_id": str(normalization_id)})
        # connect to the db
Antonin Dvorak's avatar
Antonin Dvorak committed
370
371
        conn_score = sqlite3.connect(score_folder + '/scores.db', timeout=120)

372
373
374
        modify_db(
                conn = conn_score, 
                query = '''CREATE TABLE IF NOT EXISTS scores
Antonin Dvorak's avatar
Antonin Dvorak committed
375
376
377
                    (hostgroup text, hostname text, algorithm text,
                    score real, end_window int, noramlization_id text,
                    PRIMARY KEY (hostgroup, hostname, algorithm,
378
379
                    end_window, noramlization_id))''',
                upperbound = 10)
Antonin Dvorak's avatar
Antonin Dvorak committed
380
381
382
383
384
385

        # add row by row
        num_rows = len(df)
        for i in range(num_rows):
            # Try inserting the row
            row = df.iloc[i]
386
387
388
389
390
            modify_db(
                    conn = conn_score,
                    query = '''INSERT OR IGNORE INTO scores
                        VALUES (?, ?, ?, ?, ?, ?)''',
                    upperbound = 10,
391
                    row)
Antonin Dvorak's avatar
Antonin Dvorak committed
392

393
394
        conn_score.close()

395

396
397
398
399
class PyODWrapperAnalyzer(BaseOutlierAnalyser):
    """Highlight anomalies by using a PyOD detector with PyOD APIs."""

    def __init__(self,
Matteo Paltenghi's avatar
Matteo Paltenghi committed
400
401
402
                 pyod_detector,
                 normalization_anomaly_scores=True,
                 robust=False):
403
404
        """Create the wrapper of a PyOD detector."""
        self.pyod_detector = pyod_detector
Matteo Paltenghi's avatar
Matteo Paltenghi committed
405
406
        self.robust = robust
        self.normalization_anomaly_scores = normalization_anomaly_scores
407
        self.version = 1.0
408

409
    def fit(self, X, prov, y=None, value_for_missing=0, forward_prov=False):
410
411
412
413
414
        """Fit the model using X as training data."""
        # print("PyOD in action! Nucleus: %s" % self.pyod_detector)
        # fill missing values
        X_cleaned = X.fillna(value_for_missing)
        # run algorithm iForest
415
416
417
418
        if forward_prov:
            self.pyod_detector.fit(X_cleaned, prov=prov)
        else:
            self.pyod_detector.fit(X_cleaned)
Matteo Paltenghi's avatar
Matteo Paltenghi committed
419
420
        if self.normalization_anomaly_scores:
            self._compute_coefficients_std_score()
421
422
423
424
425

    def predict(self, X, prov, y=None, value_for_missing=0):
        """Predict the model using X as testig data."""
        X_cleaned = X.fillna(value_for_missing)
        self.scores = self.pyod_detector.decision_function(X_cleaned)
Matteo Paltenghi's avatar
Matteo Paltenghi committed
426
        # print("prov: ", prov.shape)
Matteo Paltenghi's avatar
Matteo Paltenghi committed
427
428
429
430
        if len(prov.shape) == 2:
            self.host_names = [h for h in prov.iloc[:, 0].values.flatten()]
        else:
            self.host_names = prov[0]
431
432
        return self.scores

433
434
435
436
437
    def get_std_train_scores(self):
        """Get all the scores standardized during training."""
        train_scores = self.pyod_detector.decision_scores_
        return self.standardize_with_train(train_scores)

Matteo Paltenghi's avatar
Matteo Paltenghi committed
438
439
440
441
    def get_train_scores(self):
        """Get the anomaly scores on the train set."""
        return self.pyod_detector.decision_scores_

442
443
444
445
    def predict_std(self, X, prov, y=None, value_for_missing=0):
        """Predict the scores and standardize with training scores."""
        scores = self.predict(X, prov, y, value_for_missing)
        std_scores = self.standardize_with_train(scores)
446
        self.std_scores = std_scores
447
448
449
450
451
        return std_scores

    def _compute_coefficients_std_score(self):
        """Compute the standardization coefficients form training."""
        train_scores = self.pyod_detector.decision_scores_
Matteo Paltenghi's avatar
Matteo Paltenghi committed
452
        if self.normalization_anomaly_scores:
Matteo Paltenghi's avatar
Matteo Paltenghi committed
453
            if self.robust:
Matteo Paltenghi's avatar
Matteo Paltenghi committed
454
                np_scores = np.array(train_scores)
Matteo Paltenghi's avatar
Matteo Paltenghi committed
455
                transformer = RobustScaler().fit(np_scores.reshape(-1, 1))
Matteo Paltenghi's avatar
Matteo Paltenghi committed
456
457
458
459
460
461
462
463
                self.mean_train = transformer.center_[0]
                self.std_train = transformer.scale_[0]
            else:
                self.mean_train = np.mean(train_scores)
                self.std_train = np.std(train_scores)
        else:
            self.mean_train = 0
            self.std_train = 1
464
465
        # print(self.__class__, " mean:", self.mean_train)
        # print(self.__class__, " std:", self.std_train)
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480

    def standardize_with_train(self, scores):
        """Standardize with the training coefficients."""
        return (scores - self.mean_train) / self.std_train

    def predict_above_threshold(self, X, prov, y=None, value_for_missing=0,
                                threshold=2):
        """Predict the scores and standardize with training scores."""
        std_scores = self.predict_std(X, prov, y, value_for_missing)
        mask = std_scores > threshold
        host_names_outliers = [h for h
                               in prov.iloc[mask, 0].values.flatten()]
        scores_outliers = std_scores[mask]
        return host_names_outliers, scores_outliers

481
    def __repr__(self):
Matteo Paltenghi's avatar
Matteo Paltenghi committed
482
        """Create representation based on PyOD object."""
483
        return self.pyod_detector.__repr__()
Matteo Paltenghi's avatar
Matteo Paltenghi committed
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507


def divide_hyperparameters(algo):
    """Given an object algorithm detection, get the hypermarams."""
    all_params = algo.__dict__

    hyp_str = {}
    hyp_int = {}
    hyp_float = {}
    hyp_bool = {}

    for k in all_params.keys():
        current = all_params[k]
        if isinstance(current, str):
            hyp_str[k] = str(current)
        elif isinstance(current, int):
            hyp_int[k] = str(current)
        elif isinstance(current, float):
            hyp_float[k] = str(current)
        elif isinstance(current, bool):
            hyp_bool[k] = bool(current)
        else:
            hyp_str[k] = str(current)
    return hyp_str, hyp_int, hyp_float, hyp_bool