#!/usr/bin/env python import click import sqlite3 import os import pandas as pd import yaml import seaborn as sns import matplotlib # noqa import matplotlib.pyplot as plt # noqa import numpy as np import miscellanea.experiment_cern_dataset as exp_cern_ds from datetime import datetime from pathlib import Path import sklearn.metrics from miscellanea.experiment_comparison import plot_roc # noqa from sklearn.metrics import precision_recall_curve # noqa import sklearn from adcern.sqlite3_backend import modify_db # example of start and end of week MY_START_WEEK = datetime(year=2020, month=8, day=9) MY_END_WEEK = datetime(year=2020, month=8, day=16) # CONSOLE APPLICATION # see https://github.com/pallets/click/issues/1123 def normalize_names(name): return name.replace("_", "-") @click.group(context_settings={"token_normalize_func": normalize_names}) def cli(): print("Welcome in the visualization CLI.") @cli.command() @click.option('--input_folder', default="", help='''path where to look for parquet files to merge.''') @click.option('--output_folder', default="", help='''path where to save combined_scores.parquet.''') def merge_db(input_folder, output_folder): """Merge Parquet DB in the folder.""" # get db files onlyfiles = [f for f in os.listdir(input_folder) if os.path.isfile(os.path.join(input_folder, f))] db_files = [f for f in onlyfiles if ((f[:7] == "scores_") and (f[-8:] == ".parquet"))] print("db_files:") print(db_files) df_all_algos_all_windows = [] # get scores of every algo-window for parquet_filename in db_files: print(parquet_filename) df_single_algo = \ pd.read_parquet(input_folder + "/" + parquet_filename) df_all_algos_all_windows.append(df_single_algo) df_all = pd.concat(df_all_algos_all_windows, ignore_index=True) df_all.to_parquet(output_folder + "/combined_scores.parquet", index=False) @cli.command() @click.option('--hostgroup', default="", help='''full name of the hostgroup to extract.''') @click.option('--input_folder', default="", help='''path where to find labels and combined_scores.parquet. ''') @click.option('--output_folder', default="", help='''path where to save the png plot file.''') @click.option('--fixed_scale', default="True", help='''if you want to bing your scores on a fixed scale.''') def create_corr_score(hostgroup, input_folder, output_folder, fixed_scale): """Plot the correlations among scores.""" print('Creating correlation scores!') # READ ALL SCORES df_all = pd.read_parquet(input_folder + "/combined_scores.parquet") # READ ALL LABELS # get the labels name_hostgroup = hostgroup.split("/")[-1] in_csv = input_folder + "/" + name_hostgroup + "_labels.csv" df_labels = pd.read_csv(in_csv, index_col=0) # melt labels df_labels_melt = pd.melt(df_labels.reset_index(), id_vars='index', value_vars=list(df_labels.columns)) df_labels_melt["index"] = pd.to_datetime(df_labels_melt["index"]) df_labels_melt.set_index("index", inplace=True) df_labels_melt.rename(columns={'value': 'label', 'variable': 'hostname'}, inplace=True) df_labels_melt['hostname'] = \ df_labels_melt['hostname'].apply(lambda y: y + ".cern.ch") df_labels_melt.reset_index(drop=False, inplace=True) all_algos = list(df_all["algorithm"].unique()) print("ALGOS: ", all_algos) HEIGHT = 3 ROWS = len(all_algos) fig, axes = plt.subplots(ncols=ROWS, nrows=ROWS) fig.set_size_inches(ROWS * HEIGHT, ROWS * HEIGHT) fig.tight_layout(pad=3) # FOR EVERY ALGO ------------------------------------------------------- for i, algo_name_row in enumerate(all_algos): for j, algo_name_col in enumerate(all_algos): # get current axes c_ax = axes[i][j] print("i=%i, j=%i, row=%s, col=%s" % (i, j, algo_name_row, algo_name_col)) # GET SCORES OF TWO ALGOs ---------------------------------------- df_scores_algo_row = df_all[df_all["algorithm"] == algo_name_row] df_scores_algo_col = df_all[df_all["algorithm"] == algo_name_col] # COMBINE SCORES TWO ALGOS df_algos = \ df_scores_algo_row.merge( df_scores_algo_col, on=['end_window', 'hostname', 'hostgroup']) df_algos.rename(columns={"score_x": "score_row", "score_y": "score_col"}, inplace=True) df_algos = df_algos[["end_window", "hostname", "score_row", "score_col"]] # FIX TIME OF ALGOS - TIMECHANGE df_algos["end_window"] = df_algos["end_window"].astype('int') df_algos["timestamp"] = \ pd.to_datetime(df_algos["end_window"] + 60 * 60, unit='s') df_algos["index"] = pd.to_datetime(df_algos["timestamp"]) df_algos.index = df_algos["index"] df_algos = exp_cern_ds.change_time_for_scores(df_algos) # COMBINE - LABELS VS SCORES ------------------------------------- df_to_evaluate = \ df_algos.merge(df_labels_melt, on=['index', 'hostname']) df_to_evaluate.reset_index(drop=False, inplace=True) # REPLACE - DROP MIX AND EMPTY WINDOWS df_to_evaluate = df_to_evaluate.replace(2, np.NaN) df_to_evaluate = df_to_evaluate.dropna(axis=0) # TRUTH ---------------------------------------------------------- # plot anomalies on topo of noraml if on the upper part of the # matrix df_to_evaluate = \ df_to_evaluate[df_to_evaluate["label"] == int(i > j)] if (i > j): c_ax.set_title("Anomalies") my_color = "darkorange" else: c_ax.set_title("Normal") my_color = "dodgerblue" scores_row = list(df_to_evaluate['score_row']) scores_col = list(df_to_evaluate['score_col']) sns.scatterplot(y=scores_row, x=scores_col, color=my_color, ax=c_ax, marker="x") if fixed_scale == "True": lim_min = -2.5 lim_max = 10 c_ax.set_ylim((lim_min, lim_max)) c_ax.set_xlim((lim_min, lim_max)) c_ax.set_xlabel(algo_name_col) c_ax.set_ylabel(algo_name_row) # c_ax.legend() c_ax.grid() plt.savefig(output_folder + "/corr_output.png") @cli.command() @click.option('--hostgroup', default="", help='''full name of the hostgroup to inspect.''') @click.option('--input_folder', default="", help='''path where to look for labels.''') def label_presence(hostgroup, input_folder): """Check if labels for this hostgroup are already present.""" name_hostgroup = hostgroup.split("/")[-1] in_csv = input_folder + "/" + name_hostgroup + "_labels.csv" df_labels = pd.read_csv(in_csv, index_col=0) print("We have %i labels for the hostgroup: %s." % (len(df_labels), hostgroup)) @cli.command() @click.option('--grafana_token', default="", help='''path to the grafana token.''') @click.option('--hostgroup', default="", help='''full name of the hostgroup to extract.''') @click.option('--output_folder', default="", help='''path where to save the csv file.''') def extract_annotation(grafana_token, hostgroup, output_folder): """Extract the annotation from grafana and save a csv file.""" # API call to grafana jres = \ exp_cern_ds.query_for_annotations( hostgroups=[hostgroup], file_path_token=grafana_token) # CONVERT TO DATAFRAME df_annotations = exp_cern_ds.convert_to_dataframe(jres) # SAVE TO FILE name_hostgroup = hostgroup.split("/")[-1] Path(output_folder).mkdir(parents=True, exist_ok=True) out_csv = \ output_folder + "/" + name_hostgroup + "_annotations.csv" print("annotations csv out: ", out_csv) df_annotations.to_csv(out_csv, index=False) @cli.command() @click.option('--hostgroup', default="", help='''full name of the hostgroup to extract.''') @click.option('--input_folder', default="", help='''path where to find annotation csv file.''') @click.option('--analysis_file', default="", help='''json with info about the window size of analysis.''') @click.option('--config_file', default="", help='''json with info about the granularity.''') @click.option('--output_folder', default="", help='''path where to save the csv file.''') def create_labels(hostgroup, input_folder, analysis_file, config_file, output_folder): """Convert interval annotations into window labels. The window length is taken from the analysis_file. """ # read the interval annotations name_hostgroup = hostgroup.split("/")[-1] in_csv = input_folder + "/" + name_hostgroup + "_annotations.csv" df_annotations = pd.read_csv(in_csv) # read the timing of analysis - how long is a non overlapping window # read yaml with open(analysis_file) as yaml_file: config_dict_analysis = yaml.safe_load(yaml_file) # read json # with open(analysis_file) as json_file: # config_dict_analysis = json.load(json_file) # read yaml with open(config_file) as yaml_file: config_dict_etl = yaml.safe_load(yaml_file) # with open(config_file) as json_file: # config_dict_etl = json.load(json_file) steps_in_a_window = int(config_dict_analysis["slide_steps"]) granularity = int(config_dict_etl["aggregate_every_n_minutes"]) print("steps_in_a_window: ", steps_in_a_window) print("granularity: ", granularity) # convert into windows df_labels = \ exp_cern_ds.count_per_interval( df_raw_annotations=df_annotations, nr_min_in_a_window=steps_in_a_window * granularity) out_csv = output_folder + "/" + name_hostgroup + "_labels.csv" print("window labels csv out: ", out_csv) df_labels.to_csv(out_csv) @cli.command() @click.option('--hostgroup', default="", help='''full name of the hostgroup to extract.''') @click.option('--input_folder', default="", help='''path where to find labels csv file.''') @click.option('--output_folder', default="", help='''path where to save the plotting image.''') def visualize_labels(hostgroup, input_folder, output_folder): """Visualize label.""" # read the labels name_hostgroup = hostgroup.split("/")[-1] in_csv = input_folder + "/" + name_hostgroup + "_labels.csv" df_labels = pd.read_csv(in_csv, index_col=0) exp_cern_ds.visualize_heatmap_annotations(df_labels, figsize=(12, 20)) plt.savefig(output_folder + "/" + name_hostgroup + "_labels.png", bbox_inches='tight') @cli.command() @click.option('--folder_scores', default="", help='''the path to the folder with the sqlite3 database named scores.db ''') @click.option('--hostgroup', default="", help='''full name of the hostgroup to extract.''') @click.option('--labels_folder', default="", help='''path where to find labels csv file.''') @click.option('--algo_name', default="", help='''name of the algorithm to score.''') @click.option('--family', default="", help='''either: Traditional, Deep, Ensemble.''') @click.option('--start', default="", help='''start of evaluation period. Note the unusual presence of underscore "_" e.g. "2020-02-13_16:00:00"''') @click.option('--end', default="", help='''end of evaluation period. Note the unusual presence of underscore "_" e.g. "2020-03-01_00:00:00"''') def score_benchmark(folder_scores, hostgroup, labels_folder, algo_name, family, start, end): """Score the algorithm anomaly scores agains the labels.""" # get labels name_hostgroup = hostgroup.split("/")[-1] in_csv = labels_folder + "/" + name_hostgroup + "_labels.csv" df_labels = pd.read_csv(in_csv, index_col=0) # melt label matrix into a database df_labels_melt = pd.melt(df_labels.reset_index(), id_vars='index', value_vars=list(df_labels.columns)) df_labels_melt.set_index("index", inplace=True) df_labels_melt.rename(columns={'value': 'label', 'variable': 'hostname'}, inplace=True) df_labels_melt['hostname'] = \ df_labels_melt['hostname'].apply(lambda y: y + ".cern.ch") df_labels_melt.reset_index(drop=False, inplace=True) df_labels_melt["index"] = pd.to_datetime(df_labels_melt["index"]) # get scores # df_all contains all_algos and all_windows df_all = pd.read_parquet(folder_scores + "/combined_scores.parquet") df_algo = df_all[df_all["algorithm"] == algo_name] df_algo["end_window"] = df_algo["end_window"].astype('int') df_algo["timestamp"] = \ pd.to_datetime(df_algo["end_window"] + 60 * 60, unit='s') df_algo["index"] = pd.to_datetime(df_algo["timestamp"]) df_algo.index = df_algo["index"] # TODO timezones handling - problems after April df_algo = exp_cern_ds.change_time_for_scores(df_algo) if start == "" or end == "": raise Exception("You must explicetely declare start and end of your" "benchmark") start = start.replace("_", " ") end = end.replace("_", " ") df_algo = df_algo[start: end] df_algo["index"] = df_algo.index # compare - merge scores and labels df_all = df_algo.merge(df_labels_melt, on=['index', 'hostname']) df_all.reset_index(drop=False, inplace=True) df_all = df_all[["index", "hostname", "score", "label", "timestamp"]] # create metrics df_to_evaluate = df_all.copy() # REPLACE - DROP MIX AND EMPTY WINDOWS -------------------------------- df_to_evaluate = df_to_evaluate.replace(2, np.NaN) df_to_evaluate = df_to_evaluate.dropna(axis=0) # TRUTH - ALL WEEKs ---------------------------------------------------- truth = list(df_to_evaluate['label']) # noqa my_guess = list(df_to_evaluate['score']) # noqa # ADD WEEKs ------------------------------------------------------------ df_to_evaluate["week"] = \ df_to_evaluate["index"].apply( lambda x: exp_cern_ds.assign_week(x, dt_start_week=MY_START_WEEK, dt_end_week=MY_END_WEEK)) # MULTIPLE ROC --------------------------------------------------------- weeks_available = list(df_to_evaluate["week"].unique()) aucs_weeks = [] # connect to the db conn_score = sqlite3.connect(labels_folder + '/week_metrics.db', timeout=120) # ensure the table is there modify_db( conn = conn_score, query = '''CREATE TABLE IF NOT EXISTS auc (hostgroup text, algorithm text, family text, auc_score real, week_index int, end_week int, PRIMARY KEY (hostgroup, algorithm, end_week))''', upperbound = 10) # FOR EVERY WEEK ------------------------------------------------------- for w in sorted(weeks_available): print("WEEK: ", w, end=" - ") df_this_week = df_to_evaluate[df_to_evaluate["week"] == w] dt_end_week = df_this_week["timestamp"].max() end_week = int((dt_end_week - datetime(1970, 1, 1)).total_seconds()) weekly_truth = list(df_this_week['label']) weekly_my_guess = list(df_this_week['score']) fpr, tpr, trs_roc = \ sklearn.metrics.roc_curve(weekly_truth, weekly_my_guess) roc_auc = sklearn.metrics.auc(fpr, tpr) # plot_roc(fpr=fpr, tpr=tpr, roc_auc=roc_auc, # ax=ax_multiple_roc, alpha=0.2) # ALL ROCS in ONE PIC # plot_roc(fpr=fpr, tpr=tpr, roc_auc=roc_auc, # ax=dict_ax_rocs[algo_name], alpha=0.2) print("AUC: ", roc_auc) aucs_weeks.append(roc_auc) modify_db( conn = conn_score, query = '''INSERT OR IGNORE INTO auc VALUES (?, ?, ?, ?, ?, ?)''', upperbound = 10, params = (hostgroup, algo_name, family, roc_auc, int(w), end_week)) conn_score.close() # # CUMULATIVE QUANTITIES # HEIGHT = 4 # fig, axes = plt.subplots(ncols=3, nrows=1) # fig.set_size_inches(3 * HEIGHT, HEIGHT) # fig.tight_layout(pad=3) # ax_dist = axes[0] # ax_roc = axes[1] # ax_threshols = axes[2] # # DISTRIBUTION --------------------------------------------------------- # bins = np.linspace(min(my_guess), max(my_guess), num=100) # scores_for_outliers = \ # [s for is_anomalous, s in zip(truth, my_guess) if is_anomalous] # scores_for_normals = \ # [s for is_anomalous, s in zip(truth, my_guess) if not is_anomalous] # sns.distplot(scores_for_normals, bins=bins, label="Normal", # ax=ax_dist, norm_hist=True, kde=False, color="dodgerblue") # sns.distplot(scores_for_outliers, bins=bins, label="Anomaly", # ax=ax_dist, norm_hist=True, kde=False, color="darkorange") # ax_dist.legend() # # UNIQUE ROC ----------------------------------------------------------- # fpr, tpr, trs_roc = \ # sklearn.metrics.roc_curve(truth, my_guess) # roc_auc = sklearn.metrics.auc(fpr, tpr) # plot_roc(fpr=fpr, tpr=tpr, roc_auc=roc_auc, ax=ax_roc) # ax_roc.set_title("Cumulative ROC - AUC=%.3f" % roc_auc) # # THRESHOLDS --------------------------------------------------------- # pr, rec, tr = \ # precision_recall_curve(y_true=truth, probas_pred=my_guess) # f1 = [2 * (p * r) / (p + r) for (p, r) in zip(pr, rec)] # exp_cern_ds.plot_vs_thresholds(metric=pr[:-1], # metric_name="Precision", # thresholds=tr, ax=ax_threshols) # exp_cern_ds.plot_vs_thresholds(metric=rec[:-1], # metric_name="Recall (tpr)", # thresholds=tr, ax=ax_threshols) # exp_cern_ds.plot_vs_thresholds(metric=fpr, # metric_name="fpr", # thresholds=trs_roc, ax=ax_threshols) # exp_cern_ds.plot_vs_thresholds(metric=f1[:-1], # metric_name="F1", # thresholds=tr, ax=ax_threshols) # ax_threshols.legend() # ax_threshols.set_title("Thresholds Cum. Dist.") # # SAVE IMAGE # plt.savefig(labels_folder + "/cumulative_" + algo_name + ".png", # bbox_inches='tight') @cli.command() @click.option('--hostgroup', default="", help='''full name of the hostgroup to extract.''') @click.option('--input_folder', default="", help='''path where to find week_metrics.db file.''') @click.option('--output_folder', default="", help='''path where to save the plotting image.''') def visualize_auc(hostgroup, input_folder, output_folder): """Visualize the AUC results form the selected algo.""" # read the database of scores in a pandas DF conn = sqlite3.connect(input_folder + "/week_metrics.db", timeout = 120) df_week_auc = pd.read_sql_query( "SELECT * FROM auc WHERE hostgroup='{}'".format(hostgroup), conn) conn.close() # BARPLOT AUC fig, ax = plt.subplots(1, 1) fig.set_size_inches(10, 5) result = \ df_week_auc.groupby(["algorithm"])['auc_score'].aggregate(np.mean)\ .reset_index().sort_values('auc_score') sns.barplot(x="algorithm", y="auc_score", data=df_week_auc.rename(columns={"family": "Family"}), hue="Family", order=result['algorithm'], dodge=False, capsize=.15, errwidth=2.1, ax=ax) ax.set_title("Performance AUC-ROC averaged over the weeks", size=24) ax.set_ylabel("Average AUC-ROC") ax.set_ylim(0.5, 1) ax.set_xlabel("Anomaly Detection Methods") plt.xticks(rotation=90) ax.yaxis.label.set_size(18) ax.xaxis.label.set_size(18) plt.grid() plt.savefig(output_folder + "/auc_comparison_barplot.png", bbox_inches='tight') # LINEPLOT - EVOLUTION df_week_auc.drop_duplicates(inplace=True) df_week_auc_wide = df_week_auc.pivot(index="week_index", columns="algorithm", values="auc_score") fig, ax = plt.subplots(1, 1) fig.set_size_inches(10, 5) # prepare styles nr_styles_required = len(df_week_auc["algorithm"].unique()) dash_styles = ["", (4, 1.5), (1, 1), (3, 1, 1.5, 1), (5, 1, 1, 1), (5, 1, 2, 1, 2, 1), (2, 2, 3, 1.5), (1, 2.5, 3, 1.2), (1, 2, 3, 4), (6, 1), (7, 1), (8, 1), (9, 1), (10, 1)] while len(dash_styles) < nr_styles_required: dash_styles.append("") # prepare colors - depending on the family rnd_week_index = list(df_week_auc["week_index"])[0] algos = list(df_week_auc[ df_week_auc["week_index"] == rnd_week_index]["algorithm"]) families = list(df_week_auc[ df_week_auc["week_index"] == rnd_week_index]["family"]) def get_color(x): if x == "Traditional": return 'darkorange' if x == "Deep": return'dodgerblue' if x == "Ensemble": return'red' colors = {k: get_color(v) for k, v in zip(algos, families)} sns.lineplot(data=df_week_auc_wide, ax=ax, dashes=dash_styles, palette=colors) ax.set_title("Evolution of AUC-ROC over weeks", size=24) ax.set_ylabel("Weekly AUC-ROC Score") ax.set_xlabel("Weeks evolution") ax.set_ylim(0.5, 1) ax.legend(loc='lower left') ax.yaxis.label.set_size(18) ax.xaxis.label.set_size(18) plt.grid() plt.savefig(output_folder + "/auc_comparison_evolution.png", bbox_inches='tight') @cli.command() @click.option('--input_folder', default="", help='''path where to remove week_metrics.db file.''') def remove_old_database(input_folder): """Remove old database for weekly AUC results.""" try: os.remove(input_folder + "/week_metrics.db") except Exception as e: print(e) def main(): cli() if __name__ == '__main__': main()