Commit 319a52f1 authored by Matteo Paltenghi's avatar Matteo Paltenghi
Browse files

working publish to fluentd

parent 06c57a4b
#!/usr/bin/env python
"""DAG to perform Experiment analysis, create anomaly scores."""
import os
from airflow import DAG
from datetime import datetime
from datetime import timedelta
import yaml
from custom_operators import DockerToPythonOperator
from config_tasks import get_configuration_task_daily
# CONFIGURATION VARIABLES
from config_variables import PY_SCRIPT_MINE_RESULTS
from config_variables import DOCKER_CONFIG_AREA
from config_variables import DOCKER_TMP_CONF_AREA
from config_variables import LOCAL_CONFIG_AREA
default_args = {'owner': 'airflow',
'description': 'DAG to perform Experiment analysis, '
'create anomaly scores.',
# 'start_date': datetime(2020, 2, 14),
# 'end_date': datetime(2020, 6, 14),
# 'end_date': datetime(2020, 8, 23),
'start_date': datetime(2021, 3, 1),
#'end_date': datetime(2020, 11, 14),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)}
# Name the DAG and config file as this file
dag_identifier = os.path.basename(__file__)
dag_identifier = dag_identifier[:dag_identifier.rfind(".")]
# DEMO
# CONFIG_TEMPLATE_FILE_NAME_TEST = "demo/etl_test.yaml"
# CONFIG_TEMPLATE_FILE_NAME_TRAIN = "demo/etl_train.yaml"
# CONFIG_TEMPLATE_ANALYSIS = "demo/template_analysis.yaml"
# FLUENTD ESSENTIAL BATCH
CONFIG_TEMPLATE_FILE_NAME_TEST = \
"always_on/etl_test_eos.yaml"
CONFIG_TEMPLATE_FILE_NAME_TRAIN = \
"always_on/etl_train_eos.yaml"
CONFIG_TEMPLATE_ANALYSIS = \
"always_on/analysis_eos.yaml"
# BENCHMARK BATCH
# CONFIG_TEMPLATE_FILE_NAME_TEST = "benchmark/batch_etl_test.yaml"
# CONFIG_TEMPLATE_FILE_NAME_TRAIN = "benchmark/batch_etl_train.yaml"
# CONFIG_TEMPLATE_ANALYSIS = "benchmark/batch_experiment.yaml"
# ESSENTIAL BATCH
# CONFIG_TEMPLATE_FILE_NAME_TEST = \
# "benchmark/essential_metrics/batch_etl_test.yaml"
# CONFIG_TEMPLATE_FILE_NAME_TRAIN = \
# "benchmark/essential_metrics/batch_etl_train.yaml"
# CONFIG_TEMPLATE_ANALYSIS = \
# "benchmark/essential_metrics/batch_experiment.yaml"
# # BENCHMARK SHARED
# CONFIG_TEMPLATE_FILE_NAME_TEST = "benchmark/etl_test_shared.yaml"
# CONFIG_TEMPLATE_FILE_NAME_TRAIN = "benchmark/etl_train_shared.yaml"
# CONFIG_TEMPLATE_ANALYSIS = "benchmark/experiment_shared_lof.yaml"
# CONFIG_TEMPLATE_ANALYSIS = "benchmark/experiment_shared.yaml"
with DAG('dag_' + dag_identifier,
default_args=default_args,
max_active_runs=2,
schedule_interval="00 3 * * *", catchup=True) as dag:
# Prepare the config files for this chunck of data
t_prepare_config_file = \
get_configuration_task_daily(
dag_identifier=dag_identifier,
template_train=CONFIG_TEMPLATE_FILE_NAME_TRAIN,
template_test=CONFIG_TEMPLATE_FILE_NAME_TEST,
n_days_train=7,
n_days_test=1)
# # SELECT the algorithms you want to test
# # GET THE LIST OF ALGO TO CREATE
with open(LOCAL_CONFIG_AREA + CONFIG_TEMPLATE_ANALYSIS, 'r') as yaml_file:
analysis_dict = yaml.safe_load(yaml_file)
algo_and_params = analysis_dict["algo_and_params"]
# PREPARE CONFIG for ALL ALGORITHMS
for alias_name in algo_and_params.keys():
algo_dict = algo_and_params[alias_name]
module_and_class = str(algo_dict["import_path"])
train_on_test = "False"
if "train_on_test" in algo_dict.keys():
train_on_test = str(algo_dict["train_on_test"])
subsample_for_train = "0"
if "subsample_for_train" in algo_dict.keys():
subsample_for_train = str(algo_dict["subsample_for_train"])
module_name = str(module_and_class[:module_and_class.rfind(".")])
class_name = str(module_and_class[module_and_class.rfind(".") + 1:])
parameters = algo_dict["parameters"]
# use the key name to save in the table
algo_name = alias_name
# CREATE TASK ANALYSIS
dict_options_analysis = {
"alias_name": alias_name,
"class_name": class_name,
"module_name": module_name,
"analysis_path": DOCKER_CONFIG_AREA + "/"
+ CONFIG_TEMPLATE_ANALYSIS,
"train_path": DOCKER_TMP_CONF_AREA + "/" +
dag_identifier + "_train_{{ds}}.json",
"test_path": DOCKER_TMP_CONF_AREA + "/" +
dag_identifier + "_test_{{ds}}.json",
"subsample_for_train": subsample_for_train,
"train_on_test": train_on_test, # for LOF try only on one window
}
t_analysis_single_algo = \
DockerToPythonOperator(
task_id="analysis_" + algo_name + "_" + dag_identifier,
python_script=PY_SCRIPT_MINE_RESULTS,
forward_hyperparameters=parameters,
function_name="analysis",
dict_options=dict_options_analysis)
# analysis only
t_prepare_config_file.set_downstream(t_analysis_single_algo)
#!/usr/bin/env python
"""DAG to perform ETL from HDFS via Spark on an Openstck Cell."""
import os
from airflow import DAG
from datetime import datetime
from datetime import timedelta
from airflow.operators.dummy_operator import DummyOperator
from custom_operators import DockerToPythonOperator
from config_tasks import get_configuration_task_daily
# CONFIGURATION VARIABLES
from config_variables import PY_SCRIPT_MINE_RESULTS
from config_variables import DOCKER_TMP_CONF_AREA
default_args = {'owner': 'airflow',
'description': 'DAG to perform ETL from HDFS via Spark '
'on an Openstck Cell.',
# 'start_date': datetime(2020, 2, 14),
# 'end_date': datetime(2020, 8, 23),
'start_date': datetime(2021, 3, 1),
#'end_date': datetime(2020, 11, 14),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)}
# Name the DAG and config file as this file
dag_identifier = os.path.basename(__file__)
dag_identifier = dag_identifier[:dag_identifier.rfind(".")]
# DEMO
# CONFIG_TEMPLATE_FILE_NAME_TEST = "demo/etl_test.yaml"
# CONFIG_TEMPLATE_FILE_NAME_TRAIN = "demo/etl_train.yaml"
# FLUENTD ESSENTIAL BATCH
CONFIG_TEMPLATE_FILE_NAME_TEST = \
"always_on/etl_test_eos.yaml"
CONFIG_TEMPLATE_FILE_NAME_TRAIN = \
"always_on/etl_train_eos.yaml"
# BENCHMARK BATCH
# CONFIG_TEMPLATE_FILE_NAME_TEST = "benchmark/batch_etl_test.yaml"
# CONFIG_TEMPLATE_FILE_NAME_TRAIN = "benchmark/batch_etl_train.yaml"
# ESSENTIAL BATCH
# CONFIG_TEMPLATE_FILE_NAME_TEST = \
# "benchmark/essential_metrics/batch_etl_test.yaml"
# CONFIG_TEMPLATE_FILE_NAME_TRAIN = \
# "benchmark/essential_metrics/batch_etl_train.yaml"
# IMPORVE BATCH
# CONFIG_TEMPLATE_FILE_NAME_TEST = \
# "benchmark/improve_metrics/batch_etl_test.yaml"
# CONFIG_TEMPLATE_FILE_NAME_TRAIN = \
# "benchmark/improve_metrics/batch_etl_train.yaml"
# NOISE BATCH
# CONFIG_TEMPLATE_FILE_NAME_TEST = \
# "benchmark/noise_metrics/batch_etl_test.yaml"
# CONFIG_TEMPLATE_FILE_NAME_TRAIN = \
# "benchmark/noise_metrics/batch_etl_train.yaml"
# # BENCHMARK SHARED
# CONFIG_TEMPLATE_FILE_NAME_TEST = "benchmark/etl_test_shared.yaml"
# CONFIG_TEMPLATE_FILE_NAME_TRAIN = "benchmark/etl_train_shared.yaml"
with DAG('dag_' + dag_identifier,
default_args=default_args, max_active_runs=4,
schedule_interval="00 00 * * *", catchup=True) as dag:
# Prepare the config files that defines the data to be downloaded
# these files have already the info about the cell
# they have to be recreated for changing the date
t_prepare_config_file = \
get_configuration_task_daily(
dag_identifier=dag_identifier,
template_train=CONFIG_TEMPLATE_FILE_NAME_TRAIN,
template_test=CONFIG_TEMPLATE_FILE_NAME_TEST,
n_days_train=7,
n_days_test=1)
# CURRENT DATA {{ds}}
# DockerToPythonOperator can only substitute this "{{ds}}"
# https://airflow.apache.org/docs/stable/tutorial.html
# CHECK THE NORAMLIZATION COEFFICIENTS ONLY ON TRAIN
# SINCE THEY SHOULD BE THE SAME USED AT TEST
# COMMON CONFIG FILE
dict_options_norm_coeff = {
"resource_file":
DOCKER_TMP_CONF_AREA + "/" +
dag_identifier + "_train_{{ds}}.json",
}
# CHECK IF NORAMLZIATION COEFF IS THERE
t_check_normalization = \
DockerToPythonOperator(
task_id="check_norm_train_" + dag_identifier,
python_script=PY_SCRIPT_MINE_RESULTS,
function_name="normalizationpresence",
dict_options=dict_options_norm_coeff,
use_spark=True,
retries=0,
trigger_rule='all_failed')
# CHECK IF DATA ARE IN THE CACHE FOR BOTH TRAIN AND TEST
for SUFFIX in ["train", "test"]:
# COMMON CONFIG FILE - FOR ALL THE TASKS
dict_options_cache_task = {
"resource_file":
DOCKER_TMP_CONF_AREA + "/" +
dag_identifier + "_" + SUFFIX + "_{{ds}}.json",
}
# CHECK IF THE DATA ARE ALREADY CACHED LOCALLY
t_data_presence = \
DockerToPythonOperator(
task_id="data_presence_" + SUFFIX + "_" + dag_identifier,
python_script=PY_SCRIPT_MINE_RESULTS,
function_name="datapresence",
dict_options=dict_options_cache_task,
retries=0,
trigger_rule='one_success')
t_prepare_config_file >> t_data_presence
t_data_presence >> t_check_normalization
# COMPUTE NORAMLIZATION COEFFICIENTS
# trigger only if we don't have the coeff yet (previous failed)
t_compute_normalization = \
DockerToPythonOperator(
task_id="compute_norm_train_" + dag_identifier,
python_script=PY_SCRIPT_MINE_RESULTS,
function_name="computenormalization",
dict_options=dict_options_norm_coeff,
use_spark=True,
trigger_rule='all_failed')
t_check_normalization >> t_compute_normalization
# DUMMY TASK TO GROUP THE TASK AND START THE ALGO COMPUTATION
t_dummy_grouper = DummyOperator(task_id="wait_to_have_data")
# FOR BOTH TRAIN AND TEST
for SUFFIX in ["train", "test"]:
# COMMON CONFIG FILE - FOR ALL THE TASKS
dict_options_mining_tasks = {
"resource_file":
DOCKER_TMP_CONF_AREA + "/" +
dag_identifier + "_" + SUFFIX + "_{{ds}}.json",
}
# DOWNLAOD DATA IF NOT PRESENT
# trigger only if we don't have the data yet (previous failed)
t_download_data = \
DockerToPythonOperator(
task_id="data_download_" + SUFFIX + "_" + dag_identifier,
python_script=PY_SCRIPT_MINE_RESULTS,
function_name="downloaddata",
dict_options=dict_options_mining_tasks,
use_spark=True,
trigger_rule='one_success')
t_check_normalization >> t_download_data
t_compute_normalization >> t_download_data
# MOVE DATA LOCALLY - CACHE THEM
t_cache_data = \
DockerToPythonOperator(
task_id="cache_data_" + SUFFIX + "_" + dag_identifier,
python_script=PY_SCRIPT_MINE_RESULTS,
function_name="cachelocally",
dict_options=dict_options_mining_tasks,
use_spark=True)
t_download_data >> t_cache_data
t_cache_data >> t_dummy_grouper
......@@ -51,7 +51,9 @@ def get_configuration_task(dag_identifier,
def get_configuration_task_daily(dag_identifier,
template_train,
template_test,
daily=True):
daily=True,
n_days_train=7,
n_days_test=7):
"""Create task to preapares the configuration files (parallelizable).
Params
......@@ -75,31 +77,37 @@ def get_configuration_task_daily(dag_identifier,
daily_addition = "_{{ ds }}"
new_train_name = dag_identifier + "_train" + daily_addition + ".json"
new_test_name = dag_identifier + "_test" + daily_addition + ".json"
start_train = str(n_days_train + n_days_test)
end_train = str(n_days_test)
templated_command = """
cat /dev/null > %s/%s
chown airflow %s/%s
cat /dev/null > %s/%s
chown airflow %s/%s
python3 %s/%s \
-s {{ macros.ds_add(ds, -14)}} -e {{ macros.ds_add(ds, -7) }} \
-s {{ macros.ds_add(ds, -%s)}} -e {{ macros.ds_add(ds, -%s) }} \
-f %s/%s -t %s/%s \
-n {{ macros.ds_add(ds, -14)}} -m {{ macros.ds_add(ds, -7) }}
-n {{ macros.ds_add(ds, -%s)}} -m {{ macros.ds_add(ds, -%s) }}
python3 %s/%s \
-s {{ macros.ds_add(ds, -7)}} -e {{ macros.ds_add(ds, 0) }} \
-s {{ macros.ds_add(ds, -%s)}} -e {{ macros.ds_add(ds, 0) }} \
-f %s/%s -t %s/%s \
-n {{ macros.ds_add(ds, -14)}} -m {{ macros.ds_add(ds, -7) }}
-n {{ macros.ds_add(ds, -%s)}} -m {{ macros.ds_add(ds, -%s) }}
""" % (LOCAL_TMP_CONF_AREA, new_train_name,
LOCAL_TMP_CONF_AREA, new_train_name,
LOCAL_TMP_CONF_AREA, new_test_name,
LOCAL_TMP_CONF_AREA, new_test_name,
LOCAL_SCRIPTS_AREA, PY_CREATE_CONFIG_FILES,
start_train, end_train,
LOCAL_TMP_CONF_AREA, new_train_name,
LOCAL_CONFIG_AREA, template_train,
start_train, end_train,
LOCAL_SCRIPTS_AREA, PY_CREATE_CONFIG_FILES,
end_train,
LOCAL_TMP_CONF_AREA, new_test_name,
LOCAL_CONFIG_AREA, template_test) # noqa
LOCAL_CONFIG_AREA, template_test,
start_train, end_train) # noqa
task_id = "prepare_config_files_" + dag_identifier
return BashOperator(task_id=task_id,
bash_command=templated_command)
......@@ -17,7 +17,7 @@
# GRAFANA_KEY_PATH = "/opt/data_repo_volume/
# DEVELOPEMENT - EOS
IMAGE_NAME = "gitlab-registry.cern.ch/cloud-infrastructure/data-analytics/sparknotebook:ci-qa-657abf46" # noqa
IMAGE_NAME = "gitlab-registry.cern.ch/cloud-infrastructure/data-analytics/sparknotebook:v0.2" # noqa
EOS_PROJECT_ROOT = "/eos/project/i/it-cloud-data-analytics/"
......
......@@ -20,8 +20,8 @@ default_args = {'owner': 'airflow',
'on an Openstck Cell.',
# 'start_date': datetime(2020, 2, 14),
# 'end_date': datetime(2020, 8, 23),
'start_date': datetime(2020, 10, 14),
'end_date': datetime(2020, 11, 14),
'start_date': datetime(2021, 1, 14),
'end_date': datetime(2021, 2, 14),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
......
......@@ -24,8 +24,8 @@ default_args = {'owner': 'airflow',
# 'start_date': datetime(2020, 2, 14),
# 'end_date': datetime(2020, 6, 14),
# 'end_date': datetime(2020, 8, 23),
'start_date': datetime(2020, 10, 14),
'end_date': datetime(2020, 11, 14),
'start_date': datetime(2021, 1, 14),
'end_date': datetime(2021, 2, 14),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
......
# Script to keep this working directory synchronized with the EOS space
# this is achieved by adding the path of this file to the crontab
# 1. call this command:
# crontab -e
# 2. add this line
# 15 * * * * /opt/data_repo_volume/repositories/data-analytics/control_room/airflow-compose/sync_with_eos.sh
# credits: https://stackoverflow.com/questions/12460279/how-to-keep-two-folders-automatically-synchronized
. /opt/local_service_user_password.sh
echo $KRB_USER_PASSW | kinit $KRB_USER@CERN.CH
# while true; do
rsync -avz --exclude .*/ --exclude cover/ --exclude data_analytics.egg-info /opt/data_repo_volume/repositories/data-analytics/* /eos/project/i/it-cloud-data-analytics/data-analytics/repo-mirror
# done
\ No newline at end of file
......@@ -725,9 +725,9 @@ def analysis(module_name, class_name, alias_name, hyperparameters,
# PUBLISH in FLUENTD
# create ordered score
ordered_hosts, ordered_scores = zip(*critical_individuals)
#algo.publish_top_k(ts_second_window_end=ts,
# top_k=PUBLISH_PER_WINDOW,
# validity=True)
algo.publish_top_k(ts_second_window_end=ts,
top_k=PUBLISH_PER_WINDOW,
validity=True)
print("Save to Parquet...")
# SAVE TO PARQUET
......
---
# ----------------------------------------------------------------
# CONFIGURATION FILE TO DEFINE HOW TO ANALYSE YOUR DATA
# ----------------------------------------------------------------
# ----------------------------------------------------------------
# WINDOW OF ANALYSIS
# ----------------------------------------------------------------
# The length of every window in terms of steps.
# This number is dependent on the data we feed to the algorithm. This
# value should typically match the corresponding ETL config file for the test.
history_steps: 48
# The number of step between two subsequent window.
# This number is dependent on the data we feed to the algorithm. This
# value should typically match the corresponding ETL config file for the test.
# Note that slide_steps has the same value of history_steps we have a non
# overlapping scenario.
slide_steps: 48
# ----------------------------------------------------------------
# ALGORITHMs
# ----------------------------------------------------------------
# Algorithms to use for the analysis
# The keys correspond to the identifier of the algorithm.
# Note that it should be unique since ths will be used on all your plots.
# Every element will have the following sub-keys:
# - import_path: to indicate the path to the single algorithms classes.
# e.g. path.of.module.classname.
# e.g "adcern.analyser.AEDenseTF2"
# e.g "pyod.models.pca.PCA"
# then the last token will be separated by the "." and we do this:
# from pyod.models.pca import PCA
# and instantiate the object PCA()
# - parameters: dictionary of parameters to pass to the class of the algo
# during its initialization
# - train_on_test: default False, if True there won't be any training phase,
# every algorithm will operate on the current windows at test time. The only
# thing that comes from the train data is that the normalization, in fact the
# test data on which we perform direct prediction are normalized with mu and
# sigma of the data extracted as etl for the train period.
# - subsample_for_train: default 0. It defines thenumebr of training samples
# to extract to train your model. If -1 all the available data will be used.
# If 0 it uses the deafault values in the max_samples_for_train field.
# Note that subsample_for_train is used in combination with
# train_on_test = False because this allow for a bigger training set where
# subsampling makes sense
algo_and_params:
PCA_S1000:
import_path: pyod.models.pca.PCA
family: Traditional
train_on_test: False
subsample_for_train: 1000
parameters: {}
# PCA_SALL:
# import_path: pyod.models.pca.PCA
# family: Traditional
# train_on_test: False
# subsample_for_train: -1
# parameters: {}
LOF_200_S1000:
import_path: pyod.models.lof.LOF
family: Traditional
train_on_test: False
subsample_for_train: 1000
parameters:
n_neighbors: 200
#KNN_200_S1000:
# import_path: pyod.models.knn.KNN
# family: Traditional
# train_on_test: False
# subsample_for_train: 1000
# parameters:
# n_neighbors: 200
#OCSVM_S1000:
# import_path: pyod.models.ocsvm.OCSVM
# family: Traditional
# train_on_test: False
# subsample_for_train: 1000
# parameters:
# nu: 0.1
IForest_S1000:
import_path: pyod.models.iforest.IForest
family: Traditional
train_on_test: False
subsample_for_train: 1000
parameters:
n_estimators: 100
# IForest_SALL:
# import_path: pyod.models.iforest.IForest
# family: Traditional
# train_on_test: False
# subsample_for_train: -1
# parameters:
# n_estimators: 100
PERC_85_SALL:
import_path: adcern.analyser_baseline.PercScore
family: Traditional
train_on_test: True
subsample_for_train: -1
parameters:
nr_timeseries: 1110
nr_timesteps: 48
percentage_above: 0.85
# PERC_60_SALL:
# import_path: adcern.analyser_baseline.PercScore
# family: Traditional
# train_on_test: True
# subsample_for_train: -1
# parameters:
# nr_timeseries: 5
# nr_timesteps: 48
# percentage_above: 0.60
# AE_LSTM_SALL:
# import_path: adcern.analyser_deep.AELstmTF2
# family: Deep
# train_on_test: False
# subsample_for_train: -1
# parameters:
# nr_timeseries: 5
# nr_timesteps: 48
# epochs: 20
# verbose: 0
#AE_Dense_SALL:
# import_path: adcern.analyser_deep.AEDenseTF2
# family: Deep
# train_on_test: False
# subsample_for_train: -1
# parameters:
# epochs: 20
# verbose: 0
#AE_CNN_SALL:
# import_path: adcern.analyser_deep.AECnnTF2
# family: Deep
# train_on_test: False
# subsample_for_train: -1
# parameters:
# nr_timeseries: 5
# nr_timesteps: 48
# epochs: 20
# verbose: 0
# ForecastCNN_SALL:
# import_path: adcern.analyser_forecasting.ForecastCNN
# family: Deep
# train_on_test: False
# subsample_for_train: -1
# parameters:
# nr_timeseries: 5
# nr_timesteps: 48
# chunk_len: 6
# epochs: 20
# verbose: 0
# pyod.models.iforest.IForest: {}
# pyod.models.pca.PCA: {}
# pyod.models.ocsvm.OCSVM:
# nu: 0.1
# adcern.analyser_baseline.PercScore:
# nr_timeseries: 11
# nr_timesteps: 48
# percentage_above: 0.85
# pyod.models.knn.KNN: {}
# adcern.analyser_deep.AEDenseTF2:
# epochs: 20
# verbose: 0
# adcern.analyser_deep.AECnnTF2:
# nr_timeseries: 11
# nr_timesteps: 48
# epochs: 20
# verbose: 0
# adcern.analyser_deep.AELstmTF2:
# nr_timeseries: 11
# nr_timesteps: 48
# epochs: 20
# verbose: 0
# adcern.analyser_forecasting.ForecastCNN:
# nr_timeseries: 11