Commit 910b2586 authored by Antonin Dvorak's avatar Antonin Dvorak
Browse files

fix week numbers, add last+first timestamps to db, better error handling in elaborate_scores

parent 7fb7ba86
......@@ -29,7 +29,7 @@ EOS_DATALAKE_FOLDER = EOS_PROJECT_ROOT + "eos-datalake"
EOS_SCRIPT_AREA = \
SYSTEM_FOLDER + "repo-mirror/control_room/airflow-compose/task_bank"
EOS_TMP_CONFIG_AREA = \
"/eos/project/i/it-cloud-data-analytics/eos-datalake/dataset-benchmark/config_files/large_shared/"
"/eos/project/i/it-cloud-data-analytics/eos-datalake/dataset-benchmark/config_files/batch_small_antonin/"
EOS_FIXED_CONFIG_AREA = \
SYSTEM_FOLDER + "repo-mirror/control_room/config_files/"
......
......@@ -48,7 +48,7 @@ CONFIG_TEMPLATE_FILE_NAME_TEST = \
CONFIG_TEMPLATE_FILE_NAME_TRAIN = \
"publish_fluentd/batch_etl_train.yaml"
CONFIG_TEMPLATE_ANALYSIS = \
"demo/analysis_eos.yaml"
"demo/analysis_eos_batch.yaml"
# BENCHMARK BATCH
# CONFIG_TEMPLATE_FILE_NAME_TEST = "benchmark/batch_etl_test.yaml"
# CONFIG_TEMPLATE_FILE_NAME_TRAIN = "benchmark/batch_etl_train.yaml"
......
......@@ -14,7 +14,7 @@ export AIRFLOW_IMAGE=matteo-thesis-2020
export KRB_USER=advorak
export KRB_USER_PASSW=replacement
export KRB_USER_PASSW=placeholder
# Secret to encrypt
# Secret key to save connection passwords in the db.
......
......@@ -11,6 +11,9 @@ import numpy as np
import miscellanea.experiment_cern_dataset as exp_cern_ds
from datetime import datetime
from datetime import date
import calendar
from pathlib import Path
import sklearn.metrics
......@@ -274,6 +277,31 @@ def visualizelabels(hostgroup, input_folder, output_folder):
bbox_inches='tight')
def assign_week(dt_day):
week_num = int(dt_day.strftime("%U"))
year = int(datetime.date(dt_day).year)
# check, if timestamp is midnight
isMidnight = True if (dt_day - dt_day.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() == 0 else False
# convert to day of the week
dayOfWeek = calendar.day_name[dt_day.weekday()]
#print("isMidnight:", isMidnight, ", day of week:", dayOfWeek)
# if Sunday midnight, assign date to previous week
if dayOfWeek == 'Sunday' and isMidnight:
#print("Original week num:", week_num)
week_num -= 1
# if we leaped into previous year
if week_num == 0:
year -= 1
# https://stackoverflow.com/a/29263010
week_num = int(date(year,12,28).strftime("%U"))
res = str(year) + "-" + str(week_num).zfill(2)
return res
@cli.command()
@click.option('--folder_scores', default="",
help='''the path to the folder with the sqlite3 database named
......@@ -345,43 +373,51 @@ def scorebenchmark(folder_scores, hostgroup,
truth = list(df_to_evaluate['label']) # noqa
my_guess = list(df_to_evaluate['score']) # noqa
# ADD WEEKs ------------------------------------------------------------
df_to_evaluate["week"] = \
df_to_evaluate["index"].apply(
lambda x: exp_cern_ds.assign_week(x,
dt_start_week=MY_START_WEEK,
dt_end_week=MY_END_WEEK))
df_to_evaluate["week"] = df_to_evaluate["index"].apply(lambda x: assign_week(x))
# MULTIPLE ROC ---------------------------------------------------------
weeks_available = list(df_to_evaluate["week"].unique())
print("Available weeks:",weeks_available)
print("Sorted weeks:", sorted(weeks_available))
aucs_weeks = []
# connect to the db
conn_score = sqlite3.connect(labels_folder + '/week_metrics.db',
timeout=120)
upperbound = 10
with conn_score:
c = conn_score.cursor()
# ensure the table is there
for x in range(0, 30):
for x in range(1, upperbound + 1):
print("Executing query for table creation. Attempt:", x)
try:
c.execute('''CREATE TABLE IF NOT EXISTS auc
(hostgroup text, algorithm text, family text,
auc_score real, week_index int, end_week int,
auc_score real, week_index text, end_week int, start_date text, end_date text,
PRIMARY KEY
(hostgroup, algorithm, end_week))''')
except:
print("Error during sqlite3 execute, sleeping")
time.sleep(1)
except Exception as e:
print("Sqlite3 execute unsuccessful, reason: \"%s\" \nRetrying after %s sec...." % (str(e), x))
time.sleep(x)
pass
else:
break
for x in range(0, 30):
try:
print("Error during sqlite3 commit, sleeping")
if x == upperbound:
raise Exception("Sqlite3 - achieved max retries to execute query with no success, giving up...")
for x in range(1, upperbound + 1):
try:
conn_score.commit()
except:
time.sleep(1)
except Exception as e:
print("Sqlite3 commit unsuccessful, reason: \"%s\" \nRetrying after %s sec...." % (str(e), x))
time.sleep(x)
pass
else:
break
if x == upperbound:
raise Exception("Sqlite3 - achieved max retries to commit with no success, giving up...")
# FOR EVERY WEEK -------------------------------------------------------
for w in sorted(weeks_available):
......@@ -389,6 +425,8 @@ def scorebenchmark(folder_scores, hostgroup,
df_this_week = df_to_evaluate[df_to_evaluate["week"] == w]
dt_end_week = df_this_week["timestamp"].max()
end_week = int((dt_end_week - datetime(1970, 1, 1)).total_seconds())
start_date = str(df_this_week["timestamp"].min())
end_date = str(df_this_week["timestamp"].max())
weekly_truth = list(df_this_week['label'])
weekly_my_guess = list(df_this_week['score'])
fpr, tpr, trs_roc = \
......@@ -401,29 +439,35 @@ def scorebenchmark(folder_scores, hostgroup,
# ax=dict_ax_rocs[algo_name], alpha=0.2)
print("AUC: ", roc_auc)
aucs_weeks.append(roc_auc)
for x in range(0, 30):
for x in range(1, upperbound + 1):
try:
c.execute('''INSERT OR IGNORE INTO auc
VALUES (?, ?, ?, ?, ?, ?)''',
(hostgroup, algo_name, family, roc_auc, int(w), end_week))
except:
print("Error during sqlite3 execute, sleeping")
time.sleep(1)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
(hostgroup, algo_name, family, roc_auc, w, end_week, start_date, end_date))
except Exception as e:
print("Sqlite3 execute unsuccessful, reason: \"%s\" \nRetrying after %s sec...." % (str(e), x))
time.sleep(x)
pass
else:
print("Succesfully executed.")
break
if x == upperbound:
raise Exception("Sqlite3 - achieved max retries to execute query with no success, giving up...")
for x in range(0, 30):
for x in range(1, upperbound + 1):
try:
conn_score.commit()
except:
print("Error during sqlite3 commit, sleeping")
time.sleep(1)
except Exception as e:
print("Sqlite3 commit unsuccessful, reason: \"%s\" \nRetrying after %s sec...." % (str(e), x))
time.sleep(x)
pass
else:
print("Succesfully commited.")
break
if x == upperbound:
raise Exception("Sqlite3 - achieved max retries to commit with no success, giving up...")
conn_score.close()
# # CUMULATIVE QUANTITIES
......
---
# ----------------------------------------------------------------
# CONFIGURATION FILE TO DEFINE HOW TO ANALYSE YOUR DATA
# ----------------------------------------------------------------
# ----------------------------------------------------------------
# WINDOW OF ANALYSIS
# ----------------------------------------------------------------
# The length of every window in terms of steps.
# This number is dependent on the data we feed to the algorithm. This
# value should typically match the corresponding ETL config file for the test.
history_steps: 48
# The number of step between two subsequent window.
# This number is dependent on the data we feed to the algorithm. This
# value should typically match the corresponding ETL config file for the test.
# Note that slide_steps has the same value of history_steps we have a non
# overlapping scenario.
slide_steps: 48
# ----------------------------------------------------------------
# ALGORITHMs
# ----------------------------------------------------------------
# Algorithms to use for the analysis
# The keys correspond to the identifier of the algorithm.
# Note that it should be unique since ths will be used on all your plots.
# Every element will have the following sub-keys:
# - import_path: to indicate the path to the single algorithms classes.
# e.g. path.of.module.classname.
# e.g "adcern.analyser.AEDenseTF2"
# e.g "pyod.models.pca.PCA"
# then the last token will be separated by the "." and we do this:
# from pyod.models.pca import PCA
# and instantiate the object PCA()
# - parameters: dictionary of parameters to pass to the class of the algo
# during its initialization
# - train_on_test: default False, if True there won't be any training phase,
# every algorithm will operate on the current windows at test time. The only
# thing that comes from the train data is that the normalization, in fact the
# test data on which we perform direct prediction are normalized with mu and
# sigma of the data extracted as etl for the train period.
# - subsample_for_train: default 0. It defines thenumebr of training samples
# to extract to train your model. If -1 all the available data will be used.
# If 0 it uses the deafault values in the max_samples_for_train field.
# Note that subsample_for_train is used in combination with
# train_on_test = False because this allow for a bigger training set where
# subsampling makes sense
algo_and_params:
#PCA_S1000:
# import_path: pyod.models.pca.PCA
# family: Traditional
# train_on_test: False
# subsample_for_train: 1000
# parameters: {}
# PCA_SALL:
# import_path: pyod.models.pca.PCA
# family: Traditional
# train_on_test: False
# subsample_for_train: -1
# parameters: {}
#LOF_200_S1000:
# import_path: pyod.models.lof.LOF
# family: Traditional
# train_on_test: False
# subsample_for_train: 1000
# parameters:
# n_neighbors: 200
#KNN_200_S1000:
# import_path: pyod.models.knn.KNN
# family: Traditional
# train_on_test: False
# subsample_for_train: 1000
# parameters:
# n_neighbors: 200
#OCSVM_S1000:
# import_path: pyod.models.ocsvm.OCSVM
# family: Traditional
# train_on_test: False
# subsample_for_train: 1000
# parameters:
# nu: 0.1
IForest_S1000:
import_path: pyod.models.iforest.IForest
family: Traditional
train_on_test: False
subsample_for_train: 1000
parameters:
n_estimators: 100
# IForest_SALL:
# import_path: pyod.models.iforest.IForest
# family: Traditional
# train_on_test: False
# subsample_for_train: -1
# parameters:
# n_estimators: 100
PERC_85_SALL:
import_path: adcern.analyser_baseline.PercScore
family: Traditional
train_on_test: True
subsample_for_train: -1
parameters:
nr_timeseries: 16
nr_timesteps: 48
percentage_above: 0.85
# PERC_60_SALL:
# import_path: adcern.analyser_baseline.PercScore
# family: Traditional
# train_on_test: True
# subsample_for_train: -1
# parameters:
# nr_timeseries: 16
# nr_timesteps: 48
# percentage_above: 0.60
AE_LSTM_SALL:
import_path: adcern.analyser_deep.AELstmTF2
family: Deep
train_on_test: False
subsample_for_train: -1
parameters:
nr_timeseries: 16
nr_timesteps: 48
epochs: 20
verbose: 0
#AE_Dense_SALL:
# import_path: adcern.analyser_deep.AEDenseTF2
# family: Deep
# train_on_test: False
# subsample_for_train: -1
# parameters:
# epochs: 20
# verbose: 0
#AE_CNN_SALL:
# import_path: adcern.analyser_deep.AECnnTF2
# family: Deep
# train_on_test: False
# subsample_for_train: -1
# parameters:
# nr_timeseries: 16
# nr_timesteps: 48
# epochs: 20
# verbose: 0
# ForecastCNN_SALL:
# import_path: adcern.analyser_forecasting.ForecastCNN
# family: Deep
# train_on_test: False
# subsample_for_train: -1
# parameters:
# nr_timeseries: 16
# nr_timesteps: 48
# chunk_len: 6
# epochs: 20
# verbose: 0
# pyod.models.iforest.IForest: {}
# pyod.models.pca.PCA: {}
# pyod.models.ocsvm.OCSVM:
# nu: 0.1
# adcern.analyser_baseline.PercScore:
# nr_timeseries: 16
# nr_timesteps: 48
# percentage_above: 0.85
# pyod.models.knn.KNN: {}
# adcern.analyser_deep.AEDenseTF2:
# epochs: 20
# verbose: 0
# adcern.analyser_deep.AECnnTF2:
# nr_timeseries: 16
# nr_timesteps: 48
# epochs: 20
# verbose: 0
# adcern.analyser_deep.AELstmTF2:
# nr_timeseries: 16
# nr_timesteps: 48
# epochs: 20
# verbose: 0
# adcern.analyser_forecasting.ForecastCNN:
# nr_timeseries: 16
# nr_timesteps: 48
# chunk_len: 6
# epochs: 20
# verbose: 0
# ----------------------------------------------------------------
# SUB-SAMPLING
# ----------------------------------------------------------------
# Max number of sample we want to limit for traditional algortihms
max_samples_for_train: 1000
# Max number of sample we want to limit for deep algortihms
max_samples_for_train_deep: 10000
# Random seed used to subsample the data to feed to the algorithm.
# This is used only if max_samples_for_train or max_samples_for_train_deep
# are set, otherwise no subsampling is performed.
random_seed: 42
# ----------------------------------------------------------------
# ANALYSIS OUTPUT
# ----------------------------------------------------------------
# Path to save the traing time of your algo.
# Note that this path hould be accessible to the executor of the analysis
# (e.g. your VM or your container). Typically this is in a local folder if
# we work with our VM as main executor or in a shared volume if we work on
# a cluster.
folder_training_time: "/eos/project/i/it-cloud-data-analytics/Airflow/publication-v0.3/experiment_shared_large/time/"
# Path to save the anomaly scores created by your algo.
# Note that this path hould be accessible to the executor of the analysis
# (e.g. your VM or your container). Typically this is in a local folder if
# we work with our VM as main executor or in a shared volume if we work on
# a cluster.
local_scores_folder: "/eos/project/i/it-cloud-data-analytics/Airflow/publication-v0.3/experiment_shared_large/scores/"
# Number of most serious anomalies to send to MONIT for every temporal
# window of analysis.
# Note that this apllys only if your analysis executor is properly connected
# to the MONIT infrastructure via Fluentd (--log-driver docekr option)
# If not sure ignore this
publish_per_windows: 4
# ----------------------------------------------------------------
# ANNOTATION EVALUATION
# ----------------------------------------------------------------
# Hostgroup name in the form of absolute path.
# Note that this will be used to retrieve the annotations from grafana also.
hostgroup_abs_path: "cloud_compute/level2/main/gva_shared_012"
# Start and End date of the benchmark you are running.
# Only annotations in this range will be considered.
# Note that the underscore between the date and time is fundamental
# because this will be passed as a parameter on the command line
# Note that if the DAGs experiment has produced less scores than the interval
# specified here, only the intersection will be considered.
start_benchmark: "2020-06-01_16:00:00"
end_benchmark: "2020-07-01_00:00:00"
# Path to save the artifacts of the evaluation section
# Note that it might contain: annotations, plots, summarized table etc.
evaluation_artifact_path: "/eos/project/i/it-cloud-data-analytics/Airflow/publication-v0.3/experiment_shared_large/results/"
...
......@@ -81,13 +81,13 @@ algo_and_params:
# subsample_for_train: 1000
# parameters:
# nu: 0.1
IForest_S1000:
import_path: pyod.models.iforest.IForest
family: Traditional
train_on_test: False
subsample_for_train: 1000
parameters:
n_estimators: 100
#IForest_S1000:
# import_path: pyod.models.iforest.IForest
# family: Traditional
# train_on_test: False
# subsample_for_train: 1000
# parameters:
# n_estimators: 100
# IForest_SALL:
# import_path: pyod.models.iforest.IForest
# family: Traditional
......@@ -113,16 +113,16 @@ algo_and_params:
# nr_timeseries: 16
# nr_timesteps: 48
# percentage_above: 0.60
AE_LSTM_SALL:
import_path: adcern.analyser_deep.AELstmTF2
family: Deep
train_on_test: False
subsample_for_train: -1
parameters:
nr_timeseries: 5
nr_timesteps: 48
epochs: 20
verbose: 0
#AE_LSTM_SALL:
# import_path: adcern.analyser_deep.AELstmTF2
# family: Deep
# train_on_test: False
# subsample_for_train: -1
# parameters:
# nr_timeseries: 5
# nr_timesteps: 48
# epochs: 20
# verbose: 0
#AE_Dense_SALL:
# import_path: adcern.analyser_deep.AEDenseTF2
# family: Deep
......@@ -206,14 +206,14 @@ random_seed: 42
# (e.g. your VM or your container). Typically this is in a local folder if
# we work with our VM as main executor or in a shared volume if we work on
# a cluster.
folder_training_time: "/eos/project/i/it-cloud-data-analytics/Airflow/publication-v0.3/experiment_batch_small/time/"
folder_training_time: "/eos/project/i/it-cloud-data-analytics/Airflow/publication-v0.3/experiment_batch_small_antonin/scores/"
# Path to save the anomaly scores created by your algo.
# Note that this path hould be accessible to the executor of the analysis
# (e.g. your VM or your container). Typically this is in a local folder if
# we work with our VM as main executor or in a shared volume if we work on
# a cluster.
local_scores_folder: "/eos/project/i/it-cloud-data-analytics/Airflow/publication-v0.3/experiment_batch_small/scores/"
local_scores_folder: "/eos/project/i/it-cloud-data-analytics/Airflow/publication-v0.3/experiment_batch_small_antonin/scores/"
# Number of most serious anomalies to send to MONIT for every temporal
# window of analysis.
......@@ -236,11 +236,11 @@ hostgroup_abs_path: "cloud_compute/level2/batch/gva_project_013"
# because this will be passed as a parameter on the command line
# Note that if the DAGs experiment has produced less scores than the interval
# specified here, only the intersection will be considered.
start_benchmark: "2020-02-14_16:00:00"
start_benchmark: "2020-02-14_00:00:00"
end_benchmark: "2020-03-14_00:00:00"
# Path to save the artifacts of the evaluation section
# Note that it might contain: annotations, plots, summarized table etc.
evaluation_artifact_path: "/eos/project/i/it-cloud-data-analytics/Airflow/publication-v0.3/experiment_batch_small/results/"
evaluation_artifact_path: "/eos/project/i/it-cloud-data-analytics/Airflow/publication-v0.3/experiment_batch_small_antonin/results/"
...
......@@ -12,7 +12,7 @@ hostgroups:
# This is the template name, each variable in brachets is replaced by the
# corresponding values via jinja tempalting
# Note that this name is the same use both for HDFS and your local copies.
code_project_name: "batch_large_013_{{ start_date }}_{{ end_date }}_{{ start_date_normalization}}_{{ end_date_normalization }}"
code_project_name: "batch_small_013_inference_{{ start_date }}_{{ end_date }}_{{ start_date_normalization}}_{{ end_date_normalization }}"
# ----------------------------------------------------------------
# LOCAL
......@@ -22,7 +22,7 @@ code_project_name: "batch_large_013_{{ start_date }}_{{ end_date }}_{{ start_dat
# data are saved in folders with one parquet only
# metadata are saved in file with the same name of the resepctive foler
# plus the ".metadata" extension
local_cache_folder: "/eos/project/i/it-cloud-data-analytics/eos-datalake/dataset-benchmark/large_batch/"
local_cache_folder: "/eos/project/i/it-cloud-data-analytics/eos-datalake/dataset-benchmark/small_batch_antonin/"
# ----------------------------------------------------------------
# HDFS
......@@ -137,55 +137,55 @@ selected_plugins:
type: swap
type_instance: free
plugin_name: swap
cpu__percent_wait:
plugin_instance: ''
type: percent
type_instance: wait
plugin_name: cpu
cpu__percent_system:
plugin_instance: ''
type: percent
type_instance: system
plugin_name: cpu
vmem__vmpage_io_memory_out:
plugin_instance: ''
type: vmpage_io
type_instance: memory
value_instance: out
plugin_name: vmem
interface__if_octets__tx:
type: if_octets
type_instance: ''
value_instance: tx
plugin_name: interface
interface__if_octets__rx:
type: if_octets
type_instance: ''
value_instance: rx
plugin_name: interface
df_var_percent_bytes_free:
plugin_instance: var
type: percent_bytes
type_instance: free
plugin_name: df
uptime__uptime_:
plugin_instance: ''
type: uptime
type_instance: ''
plugin_name: uptime
processes__fork_rate_:
plugin_instance: ''
type: fork_rate
type_instance: ''
plugin_name: processes
processes__ps_state_sleeping:
plugin_instance: ''
type: ps_state
type_instance: sleeping
plugin_name: processes
processes__ps_state_blocked:
plugin_instance: ''
type: ps_state
type_instance: blocked
plugin_name: processes
#cpu__percent_wait:
# plugin_instance: ''
# type: percent
# type_instance: wait
# plugin_name: cpu
#cpu__percent_system:
# plugin_instance: ''
# type: percent
# type_instance: system
# plugin_name: cpu
#vmem__vmpage_io_memory_out:
# plugin_instance: ''
# type: vmpage_io
# type_instance: memory