#!/usr/bin/env python import click import sqlite3 import pandas as pd import json import yaml import matplotlib # noqa import matplotlib.pyplot as plt # noqa import numpy as np from pathlib import Path import time from etl.spark_etl.utils import read_local_window_dataset import adcern.analyser as analyser import sys import os import inspect import copy import importlib from hashlib import blake2b os.environ['PYSPARK_PYTHON'] = sys.executable from etl.spark_etl.utils import read_json_config # noqa from adcern.publisher import create_id_for_hostrgroups_and_plugins_start_end # noqa from adcern.sqlite3_backend import modify_db # TRADITIONAL # from pyod.models.iforest import IForest # noqa # from pyod.models.pca import PCA # noqa # from pyod.models.ocsvm import OCSVM # noqa # from pyod.models.lof import LOF # noqa # from pyod.models.knn import KNN # noqa # from adcern.analyser_forecasting import ForecastVAR # noqa # # DEEP LEARNING # from adcern.analyser_deep import AEDenseTF2 # noqa # from adcern.analyser_deep import AECnnTF2 # noqa # from adcern.analyser_deep import AELstmTF2 # noqa # from adcern.analyser_forecasting import ForecastCNN # noqa # from adcern.analyser_baseline import PercScore # noqa # MINING from etl.spark_etl.utils import read_local_window_dataset # noqa from etl.spark_etl.etl_pipeline import get_normalization_pandas # noqa from etl.spark_etl.etl_pipeline import pipeline_preparation_norm_coeff # noqa from etl.spark_etl import cluster_utils # noqa from etl.spark_etl.etl_pipeline import run_pipeline # noqa from etl.spark_etl.etl_pipeline import run_pipeline_all_in_one # noqa from pyspark import SparkConf # noqa: E402 # CACHE - MOVE LOCALLY import shutil # noqa from etl.spark_etl.etl_pipeline import materialize_locally # noqa ''' EXAMPLE OF FUNCTION: Write it in the click group named "cli": @cli.command() @click.option('--param1', default="", help="""description for 1st param.""") @click.option('--param2', default="", help="""description for 2nd param.""") def demo_function(param1, param2): """Docstring for the function.""" pass Then in the DAG create a task like this: from custom_operators import DockerToPythonOperator with DAG(...) as dag: # CREATE TASK ANALYSIS dict_options_function_task = { "param1": 7, "param2": "path/to/my/file.json" } task = \ DockerToPythonOperator( task_id="my_cool_task", python_script="path/to/python/script/file/containing/click/funct", forward_hyperparameters=parameters, function_name="demo_function", dict_options=dict_options_function_task) ''' # HELPER FUNCTION def create_instance(module_name, class_name, init_params_dict): """Create an instance of a class from the name and params. Params ------ module_name: str name of the file containing the class class_name: str name of the class inside the file init_params_dict: dict(str) dictionary of parameters to initialize the class. """ # import the module my_module_with_analyser = importlib.import_module(module_name) # get analyser class my_analyser_class = getattr(my_module_with_analyser, class_name) # instantiate the class instance = my_analyser_class(**init_params_dict) # # recursively get all the modules # modules_string = module_name.split(".") + [class_name] # top_module_name = modules_string[0] # lower_level_jumps = modules_string[1:] # print("import top_module_name: ", top_module_name) # module = __import__(top_module_name) # for name_attr in lower_level_jumps: # print("module before -> ") # print(module) # print("name_attr before -> ") # print(name_attr) # module = getattr(module, name_attr) # class_ = module # print("class_ end -> ") # print(class_) # instance = class_(**init_params_dict) print("Analyser instance created: ", str(instance)) return instance def myprint(astring): print("\n" + "{:p<50}".format('') + "\n") print(astring) print("\n" + "{:p<50}".format('') + "\n") def spark_preparation(): """Prepare spark context.""" try: # PREPARE SPARK myprint('PREPARING SPARK:') spark_conf = SparkConf() spark_conf.set('spark.driver.memory', '1g') spark_conf.set('spark.executor.memory', '2g') spark_conf.set('spark.driver.maxResultSize', 0) sc, spark, conf = cluster_utils.set_spark(spark_conf=spark_conf) myprint('SPARK CONTEXT: ' + str(sc)) myprint('SPARK OBJECT: ' + str(spark)) except Exception as e: myprint('ERROR WITH SPARK ACCESS:') print("Detail Error: ", e) sys.exit(1) return sc, spark, conf def read_resource(resource_file): """Read resource configuration files.""" try: myprint("RESOURCE DETAILS: %s" % resource_file) file_content = read_json_config(resource_file) print(json.dumps(file_content, indent=4)) except Exception as e: myprint('ERROR WITH RESOURCE ACCESS:') print("Detail Error: ", e) sys.exit(1) return file_content def save_scores_local_sqlite(analyser, score_folder, config_file, scores, entities, end_window): """Save scores in a Sqlite3 database. Every tuple (score, entity, end_window) becomes a new record of the table scores. It also add columns for: - algorithm: name of the class representiong this analyser - hostgroup: name of the hostgroup - noramlization_id: condensing hostgroup, plugins, normalization start normalization end. Params ------ score_folder: str folder where the database is (if not present it is created) config_file: str path to the JSON file contining the configuration of the test scores: list(float) scores of the server, each float is the score of one server entities: list(str) names of the servers, each string is the name of one server end_window: int the timestamp of the end of the window. it is broadcasted to all the records """ # get general info for every line config_dictionary = read_json_config(config_file) hostgroup = config_dictionary["hostgroups"][0] algorithm_name = analyser.__class__.__name__ if algorithm_name == "PyODWrapperAnalyzer": algorithm_name = analyser.pyod_detector.__class__.__name__ normalization_id = \ create_id_for_hostrgroups_and_plugins_start_end( hostgroups=config_dictionary["hostgroups"], plugins=config_dictionary["selected_plugins"], start=config_dictionary["date_start_normalization"], end=config_dictionary["date_end_normalization_excluded"]) # preapre result to inject df = pd.DataFrame({"hostgroup": str(hostgroup), "hostname": entities, "algorithm": str(algorithm_name), "score": scores, "end_window": str(int(end_window)), "noramlization_id": str(normalization_id)}) # connect to the db conn_score = \ sqlite3.connect(score_folder + '/scores_' + algorithm_name + '.db', timeout=120) # ensure the table is there modify_db( conn = conn_score, query = '''CREATE TABLE IF NOT EXISTS scores (hostgroup text, hostname text, algorithm text, score real, end_window int, noramlization_id text, PRIMARY KEY (hostgroup, hostname, algorithm, end_window, noramlization_id))''', upperbound = 10) # add row by row num_rows = len(df) for i in range(num_rows): # Try inserting the row row = df.iloc[i] modify_db( conn = conn_score, query = '''INSERT OR IGNORE INTO scores VALUES (?, ?, ?, ?, ?, ?)''', upperbound = 10, params = row) conn_score.close() def save_scores_local_parquet(algorithm_name, algo_parameters, score_folder, config_file, scores, entities, end_window): """Save scores in a parquet file. Every tuple (score, entity, end_window) becomes a new record. It also add columns for: - algorithm: name of the class representiong this analyser - hostgroup: name of the hostgroup - noramlization_id: condensing hostgroup, plugins, normalization start normalization end. Params ------ algorithm_name: str name used to save this data algo_parameters: dict(str) contains all the parameters of the raw initialized object score_folder: str folder where the database is (if not present it is created) config_file: str path to the JSON file contining the configuration of the test scores: list(float) scores of the server, each float is the score of one server entities: list(str) names of the servers, each string is the name of one server end_window: int the timestamp of the end of the window. it is broadcasted to all the records """ # get general info for every line config_dictionary = read_json_config(config_file) hostgroup = config_dictionary["hostgroups"][0] normalization_id = \ create_id_for_hostrgroups_and_plugins_start_end( hostgroups=config_dictionary["hostgroups"], plugins=config_dictionary["selected_plugins"], start=config_dictionary["date_start_normalization"], end=config_dictionary["date_end_normalization_excluded"]) # create initial data data = {"hostgroup": str(hostgroup), "hostname": entities, "algorithm": str(algorithm_name), "score": scores, "end_window": str(int(end_window)), "noramlization_id": str(normalization_id)} # append paramenters (if any) if algo_parameters is not None: data = {**data, **algo_parameters} # preapre result to inject df = pd.DataFrame(data) # algo identifier id_algo = "NOPARAMS" if algo_parameters is not None: h_algo = blake2b(digest_size=5) h_algo.update(str.encode(algorithm_name + json.dumps(algo_parameters))) id_algo = h_algo.hexdigest() # dump in a csv df.to_parquet(score_folder + '/scores_' + str(algorithm_name) + '_ID_' + str(id_algo) + '_W_' + str(int(end_window)) + '.parquet', index=False) # CONSOLE APPLICATION # see https://github.com/pallets/click/issues/1123 def normalize_names(name): return name.replace("_", "-") @click.group(context_settings={"token_normalize_func": normalize_names}) def cli(): print("Welcome in the Mining and Detection CLI.") @cli.command() @click.option('--resource_file', default="", help="""path to json file defining whcih normaliz to check.""") def normalization_presence(resource_file): """Check for the presence of normalization coefficients.""" # PREPARE SPARK sc, spark, conf = spark_preparation() # READ RESOURCE FILE read_resource(resource_file=resource_file) # CHECK if the NORMALIZATION is already present myprint('CHECK PRESENCE OF NORMALIZATION COEFFICIENTS...') try: norm_pdf = get_normalization_pandas(spark=spark, config_filepath=resource_file) except Exception as e: myprint("FAILURE - NORMALIZATION COEFFICIENTS NOT PRESENT :/") print("Detail Error: ", e) sys.exit(1) myprint("NORMALIZATION COEFFICIENTS FOUND :)") myprint('SUCCESS (inspect the first 40 rows):') print(norm_pdf.head(40)) @cli.command() @click.option('--resource_file', default="", help="""path to json file defining which coeff to compute.""") def compute_normalization(resource_file): """Compute normalization coefficients.""" # PREPARE SPARK sc, spark, conf = spark_preparation() # READ RESOURCE FILE read_resource(resource_file=resource_file) myprint('COMPUTE NORMALIZATION COEFFICIENTS:') try: # NORAMLIZATION COEFF norm_ret, norm_path = \ pipeline_preparation_norm_coeff(spark=spark, config_filepath=resource_file) print("Coefficient preparation shared: ", norm_ret) # PRINT TABLE norm_pdf = get_normalization_pandas(spark=spark, config_filepath=resource_file) except Exception as e: myprint('FAILURE - ERROR DURING COEFFICIENT MINING:') print("Detail Error: ", e) sys.exit(1) myprint('SUCCESS (inspect the first 40 rows):') print(norm_pdf.head(40)) @cli.command() @click.option('--resource_file', default="", help="""path to json file defining the data we need.""") def data_presence(resource_file): # PREPARE SPARK # sc, spark, conf = spark_preparation() # READ RESOURCE FILE # read_resource(resource_file=resource_file) # CHECK IF ALREADY ON HDFS # if data are of the same month this will get the data from the # previous run myprint('CHECK IF DATA ARE ALREADY AVAILABLE LOCALLY:') try: X, prov, nr_timeseries = \ read_local_window_dataset(config_filepath=resource_file) except Exception as e: myprint('FAILURE - NO DATA LOCALLY:') print("Detail Error: ", e) sys.exit(1) myprint('SUCCESS - CACHE HIT - DATA ARE ALREADY AVAILABLE LOCALLY.') @cli.command() @click.option('--resource_file', default="", help="""path to json file defining what to download.""") def transform_data(resource_file): # PREPARE SPARK sc, spark, conf = spark_preparation() # READ RESOURCE FILE read_resource(resource_file=resource_file) myprint('DOWNLOAD DATA - LONG MINING PROCESS...') try: # CREATE WINDOWS run_pipeline_all_in_one(spark=spark, config_filepath=resource_file) except Exception as e: myprint('FAILURE - PROBLEM OCCURRED IN SPARK MINING PROCEDURE:') print("Detail Error: ", e) sys.exit(1) myprint('SUCCESS - DATA AGGREGATED IN HDFS.') @cli.command() @click.option('--resource_file', default="", help="""path to json file defining what to cache.""") def copy_locally(resource_file): """Copy your data locally (aka move them from spark to local disk).""" # PREPARE SPARK sc, spark, conf = spark_preparation() # READ RESOURCE FILE file_content = read_resource(resource_file=resource_file) myprint('CACHE NEW DATA:') print("ASSUMPTION: no data was in cache before and " "if anything is there, it will be deleted.") print("Deleting any old remainders...") dir_path = \ file_content["local_cache_folder"] + \ file_content["code_project_name"] + "/" try: shutil.rmtree(dir_path) except Exception as e_delete: myprint('FAILURE - DURING CACHE CLEANING') print('Error while deleting directory: ', e_delete) # create folders locally if they do not exist Path(file_content["local_cache_folder"])\ .mkdir(parents=True, exist_ok=True) myprint("CACHE CREATION") try: # MATERIALIZE IN CACHE materialize_locally(spark=spark, config_filepath=resource_file, slide_step=file_content["slide_steps"], hostgroup_position=0) # save metadata also new_config_file_name = file_content["local_cache_folder"] + \ file_content["code_project_name"] + ".metadata" print("Save the config file locally: %s" % new_config_file_name) with open(new_config_file_name, 'w') as file: json.dump(file_content, file) file.close() except Exception as e: myprint('FAILURE - DURING CACHE CREATION:') print("Detail Error: ", e) sys.exit(1) myprint("SUCCESS - CACHED DATA AVAILABLE LOCALLY") @cli.command() @click.option('--module_name', default="", help='''name of the reltive path of file containing the class.''') @click.option('--class_name', default="", help='''name of the class inside the file.''') @click.option('--alias_name', default=None, help='''name of the algo in the table.''') @click.option('--train_path', default="", help='''path of the JSON file for the description of the TRAIN.''') @click.option('--test_path', default="", help='''path of the JSON file for the description of the TEST.''') @click.option('--analysis_path', default="", help='''path of the JSON to describe the ANALYSIS.''') @click.option('--hyperparameters', default="", help='''dictionary of parameters to initialize the class.''') @click.option('--train_on_test', default="", help='''if you want to train only on the test window.''') @click.option('--subsample_for_train', default="", help='''nr of samples you want to extract to train the model.''') def analysis(module_name, class_name, alias_name, hyperparameters, train_path, test_path, analysis_path, train_on_test, subsample_for_train): """Analyse data and produce anomaly score.""" # parse the parameters in a different way because they are # in a json format: # e.g. --hyperparameters {"n_neighbours": 200, "alias_name": "LOF_200"} print("Start " + class_name) hyperparameters = json.loads(hyperparameters.replace("'", '"')) print("Hyperparameters Analyser:") print("type(hyperparameters) -> ", type(hyperparameters)) print("hyperparameters:", hyperparameters) train_on_test = str(train_on_test) print("train_on_test: ", train_on_test) subsample_for_train = int(subsample_for_train) print("subsample_for_train: ", subsample_for_train) # Initialize ANALYZER INSTANCE core_analyser_instance = \ create_instance(module_name=module_name, class_name=class_name, init_params_dict=hyperparameters) analyser_instance = \ analyser.PyODWrapperAnalyzer(core_analyser_instance) # READ ANALYSIS PARAMS # read yaml with open(analysis_path) as yaml_file: analysis_dict = yaml.safe_load(yaml_file) # old - read json # with open(analysis_path) as json_file: # analysis_dict = json.load(json_file) folder_training_time = analysis_dict["folder_training_time"] folder_score = analysis_dict["local_scores_folder"] # create folder if they do not exist Path(folder_training_time).mkdir(parents=True, exist_ok=True) Path(folder_score).mkdir(parents=True, exist_ok=True) PUBLISH_PER_WINDOW = analysis_dict["publish_per_windows"] RANDOM_SEED = analysis_dict["random_seed"] # HISTORY_LEN = analysis_dict["history_steps"] SLIDE_STEPS = analysis_dict["slide_steps"] # ALGO_NAME = analysis_dict["algo_name"] # PREPARE TO SEND STUFF TO MONIT analyser_instance.prepare_to_publish_config( config_file=train_path, slide_steps=SLIDE_STEPS) algo_name = class_name # SUBSAMPLE (IF SPECIFIED) SAMPLES_FOR_TRAIN = None if "max_samples_for_train" in analysis_dict.keys(): SAMPLES_FOR_TRAIN = analysis_dict["max_samples_for_train"] # USE A LARGER SAMPLE IF DEEP LEARNING BASED if ((class_name in ["AEDenseTF2", "AECnnTF2", "AELstmTF2", "ForecastCNN"]) and ("max_samples_for_train_deep" in analysis_dict.keys())): SAMPLES_FOR_TRAIN = analysis_dict["max_samples_for_train_deep"] # USE THE NUMEBR SPECIFIED AT THE ALGORITHM LEVEL if subsample_for_train > 0: SAMPLES_FOR_TRAIN = subsample_for_train # USE ALL THE SAMPLES if subsample_for_train == -1: SAMPLES_FOR_TRAIN = None # USE ONLY A SUBSET OF METRICS if "list_metrics_to_keep" in analysis_dict.keys(): LIST_METRICS_TO_KEEP = analysis_dict["list_metrics_to_keep"] else: LIST_METRICS_TO_KEEP = None # GET PATH file_path_config_train = train_path file_path_config_test = test_path # TRAIN X_train, prov_train, nr_timeseries = \ read_local_window_dataset(config_filepath=file_path_config_train, list_metrics_to_keep=LIST_METRICS_TO_KEEP) # TEST X_test, prov_test, nr_timeseries = \ read_local_window_dataset(config_filepath=file_path_config_test, list_metrics_to_keep=LIST_METRICS_TO_KEEP) print("Analyser instance before train: ", str(analyser_instance)) # get name and parameters (if any) algo_parameters = None if analyser_instance.__class__.__name__ == "PyODWrapperAnalyzer": all_parameters = \ inspect.getmembers(analyser_instance.pyod_detector, lambda a: not(inspect.isroutine(a))) algo_parameters = \ dict([("param_" + p[0], str(p[1])) for p in all_parameters if p[0][0] != "_"]) # use the alias to identify the algo (if available) # otherwise use the class name algorithm_name = class_name if alias_name is not None: algorithm_name = alias_name print("Algorithm name: ", algorithm_name) print("Algo str representation: ", str(analyser_instance)) # skip the train if the algo is used to fit - predict directly # at test time on one single temporal window of the various machines # in the swarn if train_on_test == "False": print("Full model training (e.g. on the entire prev week)") algo = analyser_instance # TRAIN MODEL train_dataset = X_train algo_name = str(algo) start_fit = time.time() if SAMPLES_FOR_TRAIN is not None: if len(train_dataset) > SAMPLES_FOR_TRAIN: # noqa print("Subsampling: ", SAMPLES_FOR_TRAIN) np.random.seed(RANDOM_SEED) # randomly select the desired number of samples train_dataset = \ X_train.sample( n=SAMPLES_FOR_TRAIN, random_state=np.random.RandomState(seed=RANDOM_SEED)) if class_name == "ForecastVAR": # forward the provenance to the VAR learner because needs it to # reconstruct the time series starting from the windows of the # same machine algo.fit(train_dataset, prov=prov_train, forward_prov=True) else: # FIT THE METHOD print("Fitting the methods...") algo.fit(train_dataset, prov=prov_train) end_fit = time.time() # register time required by this algo to fit the data training_time = float(end_fit - start_fit) # SAVE TRAINING TIME with open(file_path_config_train) as json_file: data_dict_train = json.load(json_file) # connect to the db conn = sqlite3.connect(folder_training_time + '/time.db', timeout=120) modify_db( conn = conn, query = '''CREATE TABLE IF NOT EXISTS time (date_start text, date_end_excluded text, long_algo_description text, training_time real, measurement_time text, PRIMARY KEY (date_start, date_end_excluded, long_algo_description, measurement_time))''', upperbound = 10) modify_db( conn = conn, query = '''INSERT INTO time VALUES (?, ?, ?, ?, datetime('now', 'localtime'))''', upperbound = 10, params = [data_dict_train["date_start"], data_dict_train["date_end_excluded"], algo_name, training_time]) conn.close() # with open(file_path_config_train) as json_file: # data_dict_train = json.load(json_file) # file_training_time = \ # folder_training_time + "/time_" + data_dict_train["date_start"] + \ # "_" + data_dict_train["date_end_excluded"] + "_" + \ # str(int(time.time())) + ".json" # with open(file_training_time, 'w') as outfile: # print("Training time - location: %s" % # file_training_time) # json.dump(training_time, outfile) # # ITERATE AND PUBLISH algo = analyser_instance print(algo) # keep a reference copy of the not-trained model for the analyser # a brand new copy will be used for every window if train_on_test == "True": reference_copy_algo = copy.deepcopy(algo) print("Deep reference copy created") timestamps = sorted(prov_test["timestamp"].unique()) for ts in timestamps: print("Current window raw ts: ", ts) mask_current_window = prov_test["timestamp"] == ts X_window = X_test[mask_current_window] prov_window = prov_test[mask_current_window] # focus on a window it it is not empty if (len(X_window) > 1): print("Enough data: ok") print("-" * 60) print("Timestamp: ", prov_window.iloc[0, 1]) print("=" * 60) # run algorithm if train_on_test == "True": print("TRAIN ON TEST") print("We test directly on new data at test time.") print("Training a brandnew model for this window") algo = copy.deepcopy(reference_copy_algo) try: algo.fit(X_window, prov=prov_window) outlier_scores = algo.get_train_scores() except AttributeError as e: print(e) print("No train scores saved during train...") print("Never mind, we directly run on the same data") outlier_scores =\ algo.predict(X_window, prov=prov_window) else: print("Predict with the trained model:") outlier_scores = algo.predict_std(X_window, prov=prov_window) # get most critical k = PUBLISH_PER_WINDOW critical_individuals = \ [(prov_window.iloc[i, :][0], outlier_scores[i]) for i in np.argsort(outlier_scores)[::-1]] print("hostname | anomaly score") [print(c) for c in critical_individuals[:k]] # PUBLISH in FLUENTD # create ordered score # PercScore has no attribude std_scores if class_name == "PercScore": standardized_values_tmp = False else: standardized_values_tmp = True ordered_hosts, ordered_scores = zip(*critical_individuals) algo.publish_top_k(ts_second_window_end=ts, top_k=PUBLISH_PER_WINDOW, standardized_values=standardized_values_tmp, validity=True) print("Save to Parquet...") # SAVE TO PARQUET save_scores_local_parquet( algorithm_name=algorithm_name, algo_parameters=algo_parameters, score_folder=folder_score, config_file=file_path_config_test, scores=ordered_scores, entities=ordered_hosts, end_window=ts) print("Iteration on windows finished") pass def main(): cli() if __name__ == '__main__': main()