data_mining.py 27.8 KB
Newer Older
1
2
3
4
5
#!/usr/bin/env python
import click
import sqlite3
import pandas as pd
import json
6
import yaml
7
8
9
10
11
12
13
import matplotlib  # noqa
import matplotlib.pyplot as plt  # noqa
import numpy as np

from pathlib import Path
import time
from etl.spark_etl.utils import read_local_window_dataset
14
import adcern.analyser as analyser
15
16
import sys
import os
17
import inspect
18
import copy
Matteo Paltenghi's avatar
Matteo Paltenghi committed
19
import importlib
20
from hashlib import blake2b
21
22
23

os.environ['PYSPARK_PYTHON'] = sys.executable
from etl.spark_etl.utils import read_json_config  # noqa
24
from adcern.publisher import create_id_for_hostrgroups_and_plugins_start_end  # noqa
Antonin Dvorak's avatar
Antonin Dvorak committed
25
from adcern.sqlite3_backend import modify_db
26
27

# TRADITIONAL
Matteo Paltenghi's avatar
Matteo Paltenghi committed
28
29
30
31
32
33
34
35
36
37
38
39
40
# from pyod.models.iforest import IForest  # noqa
# from pyod.models.pca import PCA  # noqa
# from pyod.models.ocsvm import OCSVM  # noqa
# from pyod.models.lof import LOF  # noqa
# from pyod.models.knn import KNN  # noqa
# from adcern.analyser_forecasting import ForecastVAR  # noqa

# # DEEP LEARNING
# from adcern.analyser_deep import AEDenseTF2  # noqa
# from adcern.analyser_deep import AECnnTF2  # noqa
# from adcern.analyser_deep import AELstmTF2  # noqa
# from adcern.analyser_forecasting import ForecastCNN  # noqa
# from adcern.analyser_baseline import PercScore  # noqa
41

42
43
44
45
46
47
48
49
50
51
52
53
54
55
# MINING
from etl.spark_etl.utils import read_local_window_dataset  # noqa
from etl.spark_etl.etl_pipeline import get_normalization_pandas  # noqa
from etl.spark_etl.etl_pipeline import pipeline_preparation_norm_coeff  # noqa
from etl.spark_etl import cluster_utils  # noqa

from etl.spark_etl.etl_pipeline import run_pipeline  # noqa
from etl.spark_etl.etl_pipeline import run_pipeline_all_in_one  # noqa

from pyspark import SparkConf  # noqa: E402

# CACHE - MOVE LOCALLY
import shutil  # noqa
from etl.spark_etl.etl_pipeline import materialize_locally  # noqa
56

Matteo Paltenghi's avatar
Matteo Paltenghi committed
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

'''
EXAMPLE OF FUNCTION:
Write it in the click group named "cli":

@cli.command()
@click.option('--param1', default="",
              help="""description for 1st param.""")
@click.option('--param2', default="",
              help="""description for 2nd param.""")
def demo_function(param1, param2):
    """Docstring for the function."""
    pass

Then in the DAG create a task like this:


from custom_operators import DockerToPythonOperator

with DAG(...) as dag:
    # CREATE TASK ANALYSIS
    dict_options_function_task = {
        "param1": 7,
        "param2": "path/to/my/file.json"
    }
    task = \
        DockerToPythonOperator(
            task_id="my_cool_task",
            python_script="path/to/python/script/file/containing/click/funct",
            forward_hyperparameters=parameters,
            function_name="demo_function",
            dict_options=dict_options_function_task)

'''


93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# HELPER FUNCTION


def create_instance(module_name, class_name, init_params_dict):
    """Create an instance of a class from the name and params.

    Params
    ------
    module_name: str
        name of the file containing the class
    class_name: str
        name of the class inside the file
    init_params_dict: dict(str)
        dictionary of parameters to initialize the class.
    """
Matteo Paltenghi's avatar
Matteo Paltenghi committed
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
    # import the module
    my_module_with_analyser = importlib.import_module(module_name)

    # get analyser class
    my_analyser_class = getattr(my_module_with_analyser, class_name)

    # instantiate the class
    instance = my_analyser_class(**init_params_dict)

    # # recursively get all the modules
    # modules_string = module_name.split(".") + [class_name]

    # top_module_name = modules_string[0]
    # lower_level_jumps = modules_string[1:]

    # print("import top_module_name: ", top_module_name)
    # module = __import__(top_module_name)
    # for name_attr in lower_level_jumps:
    #     print("module before -> ")
    #     print(module)
    #     print("name_attr before -> ")
    #     print(name_attr)
    #     module = getattr(module, name_attr)
    # class_ = module

    # print("class_ end -> ")
    # print(class_)
    # instance = class_(**init_params_dict)
136
137
138
139
140

    print("Analyser instance created: ", str(instance))
    return instance


141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def myprint(astring):
    print("\n" + "{:p<50}".format('') + "\n")
    print(astring)
    print("\n" + "{:p<50}".format('') + "\n")


def spark_preparation():
    """Prepare spark context."""
    try:
        # PREPARE SPARK
        myprint('PREPARING SPARK:')
        spark_conf = SparkConf()
        spark_conf.set('spark.driver.memory', '1g')
        spark_conf.set('spark.executor.memory', '2g')
        spark_conf.set('spark.driver.maxResultSize', 0)
        sc, spark, conf = cluster_utils.set_spark(spark_conf=spark_conf)
        myprint('SPARK CONTEXT: ' + str(sc))
        myprint('SPARK OBJECT: ' + str(spark))
    except Exception as e:
        myprint('ERROR WITH SPARK ACCESS:')
        print("Detail Error: ", e)
        sys.exit(1)
    return sc, spark, conf


def read_resource(resource_file):
    """Read resource configuration files."""
    try:
        myprint("RESOURCE DETAILS: %s" % resource_file)
        file_content = read_json_config(resource_file)
        print(json.dumps(file_content, indent=4))
    except Exception as e:
        myprint('ERROR WITH RESOURCE ACCESS:')
        print("Detail Error: ", e)
        sys.exit(1)
    return file_content


179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def save_scores_local_sqlite(analyser,
                             score_folder,
                             config_file,
                             scores,
                             entities,
                             end_window):
    """Save scores in a Sqlite3 database.

    Every tuple (score, entity, end_window) becomes a new record of the
    table scores. It also add columns for:
    - algorithm: name of the class representiong this analyser
    - hostgroup: name of the hostgroup
    - noramlization_id: condensing hostgroup, plugins, normalization start
        normalization end.

    Params
    ------
    score_folder: str
        folder where the database is (if not present it is created)
    config_file: str
        path to the JSON file contining the configuration of the test
    scores: list(float)
        scores of the server, each float is the score of one server
    entities: list(str)
        names of the servers, each string is the name of one server
    end_window: int
        the timestamp of the end of the window. it is broadcasted to all
        the records
    """
    # get general info for every line
    config_dictionary = read_json_config(config_file)
    hostgroup = config_dictionary["hostgroups"][0]
    algorithm_name = analyser.__class__.__name__
    if algorithm_name == "PyODWrapperAnalyzer":
        algorithm_name = analyser.pyod_detector.__class__.__name__
    normalization_id = \
        create_id_for_hostrgroups_and_plugins_start_end(
            hostgroups=config_dictionary["hostgroups"],
            plugins=config_dictionary["selected_plugins"],
            start=config_dictionary["date_start_normalization"],
            end=config_dictionary["date_end_normalization_excluded"])

    # preapre result to inject
    df = pd.DataFrame({"hostgroup": str(hostgroup),
                       "hostname": entities,
                       "algorithm": str(algorithm_name),
                       "score": scores,
                       "end_window": str(int(end_window)),
                       "noramlization_id": str(normalization_id)})
    # connect to the db
    conn_score = \
        sqlite3.connect(score_folder
Antonin Dvorak's avatar
Antonin Dvorak committed
231
232
233
                        + '/scores_' + algorithm_name + '.db', timeout=120)

    # ensure the table is there
234
235
236
    modify_db(
            conn = conn_score, 
            query = '''CREATE TABLE IF NOT EXISTS scores
Antonin Dvorak's avatar
Antonin Dvorak committed
237
238
239
                (hostgroup text, hostname text, algorithm text,
                score real, end_window int, noramlization_id text,
                PRIMARY KEY (hostgroup, hostname, algorithm, end_window,
240
241
                noramlization_id))''',
            upperbound = 10)
Antonin Dvorak's avatar
Antonin Dvorak committed
242
243
244
245
246
247

    # add row by row
    num_rows = len(df)
    for i in range(num_rows):
        # Try inserting the row
        row = df.iloc[i]
248
249
250
251
252
        modify_db(
                conn = conn_score,
                query = '''INSERT OR IGNORE INTO scores
                    VALUES (?, ?, ?, ?, ?, ?)''', 
                upperbound = 10,                 
253
                params = row)
Antonin Dvorak's avatar
Antonin Dvorak committed
254

255
256
257
    conn_score.close()


258
259
def save_scores_local_parquet(algorithm_name,
                              algo_parameters,
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
                              score_folder,
                              config_file,
                              scores,
                              entities,
                              end_window):
    """Save scores in a parquet file.

    Every tuple (score, entity, end_window) becomes a new record.
    It also add columns for:
    - algorithm: name of the class representiong this analyser
    - hostgroup: name of the hostgroup
    - noramlization_id: condensing hostgroup, plugins, normalization start
        normalization end.

    Params
    ------
276
277
278
279
    algorithm_name: str
        name used to save this data
    algo_parameters: dict(str)
        contains all the parameters of the raw initialized object
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
    score_folder: str
        folder where the database is (if not present it is created)
    config_file: str
        path to the JSON file contining the configuration of the test
    scores: list(float)
        scores of the server, each float is the score of one server
    entities: list(str)
        names of the servers, each string is the name of one server
    end_window: int
        the timestamp of the end of the window. it is broadcasted to all
        the records
    """
    # get general info for every line
    config_dictionary = read_json_config(config_file)
    hostgroup = config_dictionary["hostgroups"][0]
    normalization_id = \
        create_id_for_hostrgroups_and_plugins_start_end(
            hostgroups=config_dictionary["hostgroups"],
            plugins=config_dictionary["selected_plugins"],
            start=config_dictionary["date_start_normalization"],
            end=config_dictionary["date_end_normalization_excluded"])
301
302
303
304
305
306
307
308
309
310
    # create initial data
    data = {"hostgroup": str(hostgroup),
            "hostname": entities,
            "algorithm": str(algorithm_name),
            "score": scores,
            "end_window": str(int(end_window)),
            "noramlization_id": str(normalization_id)}
    # append paramenters (if any)
    if algo_parameters is not None:
        data = {**data, **algo_parameters}
311
    # preapre result to inject
312
313
314
315
316
317
318
319
    df = pd.DataFrame(data)
    # algo identifier
    id_algo = "NOPARAMS"
    if algo_parameters is not None:
        h_algo = blake2b(digest_size=5)
        h_algo.update(str.encode(algorithm_name +
                                 json.dumps(algo_parameters)))
        id_algo = h_algo.hexdigest()
320
321
    # dump in a csv
    df.to_parquet(score_folder + '/scores_' + str(algorithm_name)
322
                  + '_ID_' + str(id_algo)
323
324
325
                  + '_W_' + str(int(end_window)) + '.parquet',
                  index=False)

326
327
# CONSOLE APPLICATION

328
329
330
# see https://github.com/pallets/click/issues/1123
def normalize_names(name):
    return name.replace("_", "-")
331

332
@click.group(context_settings={"token_normalize_func": normalize_names})
333
334
335
336
def cli():
    print("Welcome in the Mining and Detection CLI.")


Matteo Paltenghi's avatar
Matteo Paltenghi committed
337
@cli.command()
338
339
@click.option('--resource_file', default="",
              help="""path to json file defining whcih normaliz to check.""")
340
def normalization_presence(resource_file):
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
    """Check for the presence of normalization coefficients."""
    # PREPARE SPARK
    sc, spark, conf = spark_preparation()
    # READ RESOURCE FILE
    read_resource(resource_file=resource_file)
    # CHECK if the NORMALIZATION is already present
    myprint('CHECK PRESENCE OF NORMALIZATION COEFFICIENTS...')
    try:
        norm_pdf = get_normalization_pandas(spark=spark,
                                            config_filepath=resource_file)
    except Exception as e:
        myprint("FAILURE - NORMALIZATION COEFFICIENTS NOT PRESENT :/")
        print("Detail Error: ", e)
        sys.exit(1)
    myprint("NORMALIZATION COEFFICIENTS FOUND :)")
    myprint('SUCCESS (inspect the first 40 rows):')
    print(norm_pdf.head(40))


@cli.command()
@click.option('--resource_file', default="",
              help="""path to json file defining which coeff to compute.""")
363
def compute_normalization(resource_file):
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
    """Compute normalization coefficients."""
    # PREPARE SPARK
    sc, spark, conf = spark_preparation()
    # READ RESOURCE FILE
    read_resource(resource_file=resource_file)
    myprint('COMPUTE NORMALIZATION COEFFICIENTS:')
    try:
        # NORAMLIZATION COEFF
        norm_ret, norm_path = \
            pipeline_preparation_norm_coeff(spark=spark,
                                            config_filepath=resource_file)
        print("Coefficient preparation shared: ", norm_ret)
        # PRINT TABLE
        norm_pdf = get_normalization_pandas(spark=spark,
                                            config_filepath=resource_file)
    except Exception as e:
        myprint('FAILURE - ERROR DURING COEFFICIENT MINING:')
        print("Detail Error: ", e)
        sys.exit(1)
    myprint('SUCCESS (inspect the first 40 rows):')
    print(norm_pdf.head(40))


@cli.command()
@click.option('--resource_file', default="",
              help="""path to json file defining the data we need.""")
390
def data_presence(resource_file):
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
    # PREPARE SPARK
    # sc, spark, conf = spark_preparation()
    # READ RESOURCE FILE
    # read_resource(resource_file=resource_file)
    # CHECK IF ALREADY ON HDFS
    # if data are of the same month this will get the data from the
    # previous run
    myprint('CHECK IF DATA ARE ALREADY AVAILABLE LOCALLY:')
    try:
        X, prov, nr_timeseries = \
            read_local_window_dataset(config_filepath=resource_file)
    except Exception as e:
        myprint('FAILURE - NO DATA LOCALLY:')
        print("Detail Error: ", e)
        sys.exit(1)
    myprint('SUCCESS - CACHE HIT - DATA ARE ALREADY AVAILABLE LOCALLY.')


@cli.command()
@click.option('--resource_file', default="",
Matteo Paltenghi's avatar
Matteo Paltenghi committed
411
              help="""path to json file defining what to download.""")
Domenico Giordano's avatar
Domenico Giordano committed
412
def transform_data(resource_file):
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
    # PREPARE SPARK
    sc, spark, conf = spark_preparation()
    # READ RESOURCE FILE
    read_resource(resource_file=resource_file)
    myprint('DOWNLOAD DATA - LONG MINING PROCESS...')
    try:
        # CREATE WINDOWS
        run_pipeline_all_in_one(spark=spark,
                                config_filepath=resource_file)
    except Exception as e:
        myprint('FAILURE - PROBLEM OCCURRED IN SPARK MINING PROCEDURE:')
        print("Detail Error: ", e)
        sys.exit(1)
    myprint('SUCCESS - DATA AGGREGATED IN HDFS.')


@cli.command()
@click.option('--resource_file', default="",
              help="""path to json file defining what to cache.""")
Domenico Giordano's avatar
Domenico Giordano committed
432
433
def copy_locally(resource_file):
    """Copy your data locally (aka move them from spark to local disk)."""
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
    # PREPARE SPARK
    sc, spark, conf = spark_preparation()
    # READ RESOURCE FILE
    file_content = read_resource(resource_file=resource_file)
    myprint('CACHE NEW DATA:')
    print("ASSUMPTION: no data was in cache before and "
          "if anything is there, it will be deleted.")
    print("Deleting any old remainders...")
    dir_path = \
        file_content["local_cache_folder"] + \
        file_content["code_project_name"] + "/"
    try:
        shutil.rmtree(dir_path)
    except Exception as e_delete:
        myprint('FAILURE - DURING CACHE CLEANING')
        print('Error while deleting directory: ', e_delete)
    # create folders locally if they do not exist
    Path(file_content["local_cache_folder"])\
        .mkdir(parents=True, exist_ok=True)
    myprint("CACHE CREATION")
    try:
        # MATERIALIZE IN CACHE
        materialize_locally(spark=spark,
                            config_filepath=resource_file,
                            slide_step=file_content["slide_steps"],
                            hostgroup_position=0)
        # save metadata also
        new_config_file_name = file_content["local_cache_folder"] + \
            file_content["code_project_name"] + ".metadata"
        print("Save the config file locally: %s" %
              new_config_file_name)
        with open(new_config_file_name, 'w') as file:
            json.dump(file_content, file)
            file.close()
    except Exception as e:
        myprint('FAILURE - DURING CACHE CREATION:')
        print("Detail Error: ", e)
        sys.exit(1)
    myprint("SUCCESS - CACHED DATA AVAILABLE LOCALLY")
Matteo Paltenghi's avatar
Matteo Paltenghi committed
473
474


475
476
477
478
479
480
@cli.command()
@click.option('--module_name', default="",
              help='''name of the reltive path of file
                      containing the class.''')
@click.option('--class_name', default="",
              help='''name of the class inside the file.''')
481
482
@click.option('--alias_name', default=None,
              help='''name of the algo in the table.''')
483
484
485
486
487
488
489
490
491
492
@click.option('--train_path', default="",
              help='''path of the JSON file for the
                      description of the TRAIN.''')
@click.option('--test_path', default="",
              help='''path of the JSON file for the description
                      of the TEST.''')
@click.option('--analysis_path', default="",
              help='''path of the JSON to describe the ANALYSIS.''')
@click.option('--hyperparameters', default="",
              help='''dictionary of parameters to initialize the class.''')
493
@click.option('--train_on_test', default="",
494
              help='''if you want to train only on the test window.''')
495
496
@click.option('--subsample_for_train', default="",
              help='''nr of samples you want to extract to train the model.''')
497
def analysis(module_name, class_name, alias_name, hyperparameters,
498
499
             train_path, test_path, analysis_path, train_on_test,
             subsample_for_train):
500
501
502
    """Analyse data and produce anomaly score."""
    # parse the parameters in a different way because they are
    # in a json format:
503
    # e.g. --hyperparameters {"n_neighbours": 200, "alias_name": "LOF_200"}
504
505
    print("Start " + class_name)
    hyperparameters = json.loads(hyperparameters.replace("'", '"'))
506

507
508
509
    print("Hyperparameters Analyser:")
    print("type(hyperparameters) -> ", type(hyperparameters))
    print("hyperparameters:", hyperparameters)
510
511
    train_on_test = str(train_on_test)
    print("train_on_test: ", train_on_test)
512
513
    subsample_for_train = int(subsample_for_train)
    print("subsample_for_train: ", subsample_for_train)
514
515
516
517
518
519
520
521
522
523

    # Initialize ANALYZER INSTANCE
    core_analyser_instance = \
        create_instance(module_name=module_name,
                        class_name=class_name,
                        init_params_dict=hyperparameters)
    analyser_instance = \
        analyser.PyODWrapperAnalyzer(core_analyser_instance)

    # READ ANALYSIS PARAMS
524
525
526
527
528
529
530
    # read yaml
    with open(analysis_path) as yaml_file:
        analysis_dict = yaml.safe_load(yaml_file)

    # old - read json
    # with open(analysis_path) as json_file:
    #     analysis_dict = json.load(json_file)
531
532
533
534
535
536
537
538
539
    folder_training_time = analysis_dict["folder_training_time"]
    folder_score = analysis_dict["local_scores_folder"]
    # create folder if they do not exist
    Path(folder_training_time).mkdir(parents=True, exist_ok=True)
    Path(folder_score).mkdir(parents=True, exist_ok=True)

    PUBLISH_PER_WINDOW = analysis_dict["publish_per_windows"]
    RANDOM_SEED = analysis_dict["random_seed"]
    # HISTORY_LEN = analysis_dict["history_steps"]
Matteo Paltenghi's avatar
Matteo Paltenghi committed
540
    SLIDE_STEPS = analysis_dict["slide_steps"]
541
542
    # ALGO_NAME = analysis_dict["algo_name"]

Matteo Paltenghi's avatar
Matteo Paltenghi committed
543
544
545
546
547
    # PREPARE TO SEND STUFF TO MONIT
    analyser_instance.prepare_to_publish_config(
        config_file=train_path,
        slide_steps=SLIDE_STEPS)

548
549
    algo_name = class_name

550
    # SUBSAMPLE (IF SPECIFIED)
551
    SAMPLES_FOR_TRAIN = None
552
553
554
555
    if "max_samples_for_train" in analysis_dict.keys():
        SAMPLES_FOR_TRAIN = analysis_dict["max_samples_for_train"]

    # USE A LARGER SAMPLE IF DEEP LEARNING BASED
556
    if ((class_name in ["AEDenseTF2", "AECnnTF2", "AELstmTF2", "ForecastCNN"])
557
558
559
            and ("max_samples_for_train_deep" in analysis_dict.keys())):
        SAMPLES_FOR_TRAIN = analysis_dict["max_samples_for_train_deep"]

560
561
562
563
564
565
566
567
    # USE THE NUMEBR SPECIFIED AT THE ALGORITHM LEVEL
    if subsample_for_train > 0:
        SAMPLES_FOR_TRAIN = subsample_for_train

    # USE ALL THE SAMPLES
    if subsample_for_train == -1:
        SAMPLES_FOR_TRAIN = None

568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
    # USE ONLY A SUBSET OF METRICS
    if "list_metrics_to_keep" in analysis_dict.keys():
        LIST_METRICS_TO_KEEP = analysis_dict["list_metrics_to_keep"]
    else:
        LIST_METRICS_TO_KEEP = None

    # GET PATH
    file_path_config_train = train_path
    file_path_config_test = test_path

    # TRAIN
    X_train, prov_train, nr_timeseries = \
        read_local_window_dataset(config_filepath=file_path_config_train,
                                  list_metrics_to_keep=LIST_METRICS_TO_KEEP)
    # TEST
    X_test, prov_test, nr_timeseries = \
        read_local_window_dataset(config_filepath=file_path_config_test,
                                  list_metrics_to_keep=LIST_METRICS_TO_KEEP)

    print("Analyser instance before train: ", str(analyser_instance))
588

589
590
591
592
593
    # get name and parameters (if any)
    algo_parameters = None
    if analyser_instance.__class__.__name__ == "PyODWrapperAnalyzer":
        all_parameters = \
            inspect.getmembers(analyser_instance.pyod_detector,
594
                               lambda a: not(inspect.isroutine(a)))
595
596
597
598
599
        algo_parameters = \
            dict([("param_" + p[0], str(p[1]))
                  for p in all_parameters if p[0][0] != "_"])
    # use the alias to identify the algo (if available)
    # otherwise use the class name
600
    algorithm_name = class_name
601
602
    if alias_name is not None:
        algorithm_name = alias_name
603

604
605
606
    print("Algorithm name: ", algorithm_name)
    print("Algo str representation: ", str(analyser_instance))

607
608
609
    # skip the train if the algo is used to fit - predict directly
    # at test time on one single temporal window of the various machines
    # in the swarn
610
    if train_on_test == "False":
611
        print("Full model training (e.g. on the entire prev week)")
612
613
614
615
616
617
618
619
        algo = analyser_instance
        # TRAIN MODEL
        train_dataset = X_train

        algo_name = str(algo)

        start_fit = time.time()

620
621
622
623
624
625
626
627
628
        if SAMPLES_FOR_TRAIN is not None:
            if len(train_dataset) > SAMPLES_FOR_TRAIN:  # noqa
                print("Subsampling: ", SAMPLES_FOR_TRAIN)
                np.random.seed(RANDOM_SEED)
                # randomly select the desired number of samples
                train_dataset = \
                    X_train.sample(
                        n=SAMPLES_FOR_TRAIN,
                        random_state=np.random.RandomState(seed=RANDOM_SEED))
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647

        if class_name == "ForecastVAR":
            # forward the provenance to the VAR learner because needs it to
            # reconstruct the time series starting from the windows of the
            # same machine
            algo.fit(train_dataset, prov=prov_train, forward_prov=True)
        else:
            # FIT THE METHOD
            print("Fitting the methods...")
            algo.fit(train_dataset, prov=prov_train)

        end_fit = time.time()
        # register time required by this algo to fit the data
        training_time = float(end_fit - start_fit)

        # SAVE TRAINING TIME
        with open(file_path_config_train) as json_file:
            data_dict_train = json.load(json_file)
        # connect to the db
648
649
        conn = sqlite3.connect(folder_training_time + '/time.db', timeout=120)

650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
        modify_db(
                conn = conn, 
                query = '''CREATE TABLE IF NOT EXISTS time
                    (date_start text, date_end_excluded text,
                    long_algo_description text,
                    training_time real, measurement_time text,
                    PRIMARY KEY (date_start, date_end_excluded,
                                long_algo_description,
                                measurement_time))''',
                upperbound = 10)

        modify_db(
                conn = conn, 
                query = '''INSERT INTO time
                    VALUES (?, ?, ?, ?, datetime('now', 'localtime'))''',
                upperbound = 10,
666
                params = [data_dict_train["date_start"],
667
668
669
                    data_dict_train["date_end_excluded"],
                    algo_name,
                    training_time])
670

671
        conn.close()
Matteo Paltenghi's avatar
Matteo Paltenghi committed
672
673
674
675
676
677
678
679
680
681
    # with open(file_path_config_train) as json_file:
    #     data_dict_train = json.load(json_file)
    # file_training_time = \
    #     folder_training_time + "/time_" + data_dict_train["date_start"] + \
    #     "_" + data_dict_train["date_end_excluded"] + "_" + \
    #     str(int(time.time())) + ".json"
    # with open(file_training_time, 'w') as outfile:
    #     print("Training time - location: %s" %
    #           file_training_time)
    #     json.dump(training_time, outfile)
682

683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
    # # ITERATE AND PUBLISH

    algo = analyser_instance
    print(algo)

    # keep a reference copy of the not-trained model for the analyser
    # a brand new copy will be used for every window
    if train_on_test == "True":
        reference_copy_algo = copy.deepcopy(algo)
        print("Deep reference copy created")
    timestamps = sorted(prov_test["timestamp"].unique())

    for ts in timestamps:

        print("Current window raw ts: ", ts)
        mask_current_window = prov_test["timestamp"] == ts
        X_window = X_test[mask_current_window]
        prov_window = prov_test[mask_current_window]

        # focus on a window it it is not empty
        if (len(X_window) > 1):
            print("Enough data: ok")

            print("-" * 60)
            print("Timestamp: ", prov_window.iloc[0, 1])
            print("=" * 60)

            # run algorithm
            if train_on_test == "True":
                print("TRAIN ON TEST")
                print("We test directly on new data at test time.")
                print("Training a brandnew model for this window")
                algo = copy.deepcopy(reference_copy_algo)
                try:
                    algo.fit(X_window, prov=prov_window)
                    outlier_scores = algo.get_train_scores()
                except AttributeError as e:
                    print(e)
                    print("No train scores saved during train...")
                    print("Never mind, we directly run on the same data")
                    outlier_scores =\
                        algo.predict(X_window, prov=prov_window)
            else:
                print("Predict with the trained model:")
                outlier_scores = algo.predict_std(X_window,
728
                                                  prov=prov_window)
729
730
731
732
733
734
735
736
            # get most critical
            k = PUBLISH_PER_WINDOW
            critical_individuals = \
                [(prov_window.iloc[i, :][0], outlier_scores[i])
                    for i in np.argsort(outlier_scores)[::-1]]
            print("hostname | anomaly score")
            [print(c) for c in critical_individuals[:k]]

Matteo Paltenghi's avatar
Matteo Paltenghi committed
737
            # PUBLISH in FLUENTD
738
            # create ordered score
739
740
741
742
743
744
745

            # PercScore has no attribude std_scores
            if class_name == "PercScore":
                standardized_values_tmp = False
            else:
                standardized_values_tmp = True
            
746
            ordered_hosts, ordered_scores = zip(*critical_individuals)
Matteo Paltenghi's avatar
Matteo Paltenghi committed
747
748
            algo.publish_top_k(ts_second_window_end=ts,
                               top_k=PUBLISH_PER_WINDOW,
749
                               standardized_values=standardized_values_tmp,
Matteo Paltenghi's avatar
Matteo Paltenghi committed
750
                               validity=True)
751
752
753
754
755
756
757
758
759
760
761

            print("Save to Parquet...")
            # SAVE TO PARQUET
            save_scores_local_parquet(
                algorithm_name=algorithm_name,
                algo_parameters=algo_parameters,
                score_folder=folder_score,
                config_file=file_path_config_test,
                scores=ordered_scores,
                entities=ordered_hosts,
                end_window=ts)
762
763
764
765

    print("Iteration on windows finished")
    pass

Domenico Giordano's avatar
Domenico Giordano committed
766
767
def main():
    cli()
768
769

if __name__ == '__main__':
Domenico Giordano's avatar
Domenico Giordano committed
770
    main()