Commit 856ab3ed authored by smetaj's avatar smetaj
Browse files

Merge branch 'qa-v0.4_stiven' into stiv_newer_merge

Merging the last modifications.
parents 0a62a3dd a2a146fc
......@@ -44,4 +44,7 @@ This is a guide map of the repository:
## From where to start
Detail procedures for newcomers (W.I.P.)
1. For a general introduction on this activity see the [ITTF seminar](https://indico.cern.ch/event/1012703/)
1. For interactive examples see [examples](examples)
1. For Airflow deployment see [control_room](control_room))
......@@ -402,7 +402,7 @@ def data_presence(resource_file):
@cli.command()
@click.option('--resource_file', default="",
help="""path to json file defining what to download.""")
def download_data(resource_file):
def transform_data(resource_file):
# PREPARE SPARK
sc, spark, conf = spark_preparation()
# READ RESOURCE FILE
......@@ -422,8 +422,8 @@ def download_data(resource_file):
@cli.command()
@click.option('--resource_file', default="",
help="""path to json file defining what to cache.""")
def cache_locally(resource_file):
"""Cache your data locally (aka move them from spark to local disk)."""
def copy_locally(resource_file):
"""Copy your data locally (aka move them from spark to local disk)."""
# PREPARE SPARK
sc, spark, conf = spark_preparation()
# READ RESOURCE FILE
......@@ -638,24 +638,23 @@ def analysis(module_name, class_name, alias_name, hyperparameters,
with open(file_path_config_train) as json_file:
data_dict_train = json.load(json_file)
# connect to the db
conn = sqlite3.connect(folder_training_time + '/time.db', timeout = 120)
c = conn.cursor()
# ensure the table is there
c.execute('''CREATE TABLE IF NOT EXISTS time
(date_start text, date_end_excluded text,
long_algo_description text,
training_time real, measurement_time text,
PRIMARY KEY (date_start, date_end_excluded,
long_algo_description,
measurement_time))''')
conn.commit()
c.execute('''INSERT INTO time
conn = sqlite3.connect(folder_training_time + '/time.db', timeout=120)
modify_db(conn, '''CREATE TABLE IF NOT EXISTS time
(date_start text, date_end_excluded text,
long_algo_description text,
training_time real, measurement_time text,
PRIMARY KEY (date_start, date_end_excluded,
long_algo_description,
measurement_time))''')
modify_db(conn, '''INSERT INTO time
VALUES (?, ?, ?, ?, datetime('now', 'localtime'))''',
[data_dict_train["date_start"],
data_dict_train["date_end_excluded"],
algo_name,
training_time])
conn.commit()
[data_dict_train["date_start"],
data_dict_train["date_end_excluded"],
algo_name,
training_time])
conn.close()
# with open(file_path_config_train) as json_file:
# data_dict_train = json.load(json_file)
......
import sqlite3
import sqlite3
import time
def modify_db(conn,
query,
......@@ -16,21 +17,30 @@ def modify_db(conn,
args: array
additional arguments for execute query, optional
"""
with conn.cursor() as c:
if args:
c.execute(query,args)
else:
c.execute(query)
with conn.cursor() as c:
for x in range(1, 11):
try:
if args:
c.execute(query, args)
else:
c.execute(query)
except:
print("Sqlite3 execute unsuccessful, retrying after %s sec...." %x)
time.sleep(x)
pass
else:
print("Sqlite3 execute successful, breaking the retry cycle.")
break
# retry commit - db might be locked by different process
for x in range(0, 10):
for x in range(1, 11):
try:
conn.commit()
except:
print("Commit to sqlite unsuccessful, retrying....")
time.sleep(1)
print("Sqlite3 commit unsuccessful, retrying after %s sec...." %x)
time.sleep(x)
pass
finally:
else:
print("Sqlite3 commit successful, breaking the retry cycle.")
break
\ No newline at end of file
......@@ -102,27 +102,27 @@ def ad_etl_dag(dag_id='give_me_a_name', override_params={}):
)
ad_tasks = [
('local_data_presence', 'data_mining data_presence --resource_file ad_config_file.json', 'all_success'),
('normalization_presence', 'data_mining normalization_presence --resource_file ad_config_file.json', 'all_failed'),
('compute_normalization' , 'data_mining compute_normalization --resource_file ad_config_file.json', 'all_failed'),
('spark_etl_to_hdfs', 'data_mining download_data --resource_file ad_config_file.json', 'one_success'),
('from_hdfs_to_local', 'data_mining cache_locally --resource_file ad_config_file.json', 'all_success'),
('check_local_data_presence', 'data_mining data_presence --resource_file ad_config_file.json', 'all_success'),
('spark_normalization_presence', 'data_mining normalization_presence --resource_file ad_config_file.json', 'all_failed'),
('spark_compute_normalization' , 'data_mining compute_normalization --resource_file ad_config_file.json', 'all_failed'),
('spark_transform_data', 'data_mining transform_data --resource_file ad_config_file.json', 'one_success'),
('spark_mv_data_to_local', 'data_mining copy_locally --resource_file ad_config_file.json', 'all_success'),
]
for atask in ad_tasks:
globals()[atask[0]] = return_configured_BashOperator(*atask)
# Start checking the local data presence and in case break pipeline
local_data_presence >> dag_exit_status
check_local_data_presence >> dag_exit_status
# Otherwise if datapresence fails, check the normalization presence
local_data_presence >> normalization_presence
check_local_data_presence >> spark_normalization_presence
# if missing the normalization compute it and then download data
normalization_presence >> compute_normalization >> spark_etl_to_hdfs
spark_normalization_presence >> spark_compute_normalization >> spark_transform_data
# if normalization presence succeeds start immediately downloading data
normalization_presence >> spark_etl_to_hdfs
# Finally cache data locally (#FIXME what does it mean?)
spark_etl_to_hdfs >> from_hdfs_to_local
spark_normalization_presence >> spark_transform_data
# Finally move data to local folder (can also be eos)
spark_transform_data >> spark_mv_data_to_local
from_hdfs_to_local >> dag_exit_status
spark_mv_data_to_local >> dag_exit_status
return dag
# Modules
The repository implements ETL (Extract, Transform, Loa) modules for:
The repository implements ETL (Extract, Transform, Load) modules for:
1. Grafana ETL
1. Spark ETL
......
......@@ -3,4 +3,53 @@
This folder contains the library necessary to download the HDFS data:
1. cluster_utils.py + utils.py: methods to access the Spark context and prepare HDFS paths and folders.
1. etl_steps.py: low level operations that define how to aggregate the time series data, with which granularity, the hard part of Spark processing is here. Note that the normalization strategy is here.
1. etl_pipeline.py: how to combine basic steps into the final ETL pipeline from definition of what you want to find them stored in the desired way in HDFS under the desired path.
\ No newline at end of file
1. etl_pipeline.py: how to combine basic steps into the final ETL pipeline from definition of what you want to find them stored in the desired way in HDFS under the desired path.
<hr>
Let's describe more details about the most important functions that you can find in the different files inside this folder (keep in my mind that, especially for atomic operations, you can find some documentation in the code about the functions and the expected parameters).
## etl_steps.py
- **data_reading**(spark, plugins, days, hostgroups)
Read all plugin and all days.
- **downsampling_and_aggregate**(spark, df_all_plugins, every_n_minutes)
Create the aggregate every x minutes (e.g. if every 10 min it means that data between 15:10 and 15:20 are summarized with the mean statistic and they will get timestamp 15:20.
- **normalize**(spark, df, df_normalization)
Remove the mean and divide by the std deviation to the value column.
- **compute_the_coefficients**(spark, df_aggregated)
Compute from each of the plugin columns. These coefficients are needed for the normalization step.
- **create_window**(spark, df_plugins_as_col, start_date, end_date, history_steps, future_steps, every_n_minutes)
Create a window with the timesteps for history and past. Create the lagged timesteps for each column (aka plugin). Do the same also for furture steps. Note that beforhand, all the missing timesteps have to be replaced with a null value.
- **data_writing**(spark, df_window, outfolder, every_n_minutes, history_steps, future_steps, id_plugins, id_normalization, mode='overwrite')
Save the windows partitioned in the outfolder. It saves every window in a folder path like the following:
```
<outpath>/10min/gva_project_014/2020/02/14/
```
and a filename like the following:
```
<id_plugins>_h<history_steps>_f<future_steps>_<normalization_id>
```
Note that an id to identify that group of plugins will be created.
## etl_pipeline.py
- run_pipeline_all_in_one(spark, config_filepath)
An unique function called in the ETL Airflow pipeline that produces the windwos datasets in HDFS, divided by day and hostgroup. (We use almost every main function of etl_steps.py).
- pipeline_preparation_norm_coeff(spark, config_filepath)
Run the pipeline to get the coefficeint and create a normalization df. It produces normalization datasets in HDFS with noramlization coefficents (e.g. mean and stddev) for every pair of (hostgroup, plugin).
(This for example is not used in the "all_in_one" function above, but it will be used in a single task on Airflow for preparing the normalization coefficients).
- ...
......@@ -36,16 +36,27 @@ TIMESTAMP_TO_CONSIDER = 'event_timestamp'
# SPARK MANAGEMENT -----------------------------------------------------------
def set_spark(spark_conf=None, view_ui=False):
"""Set Spark.
NB: kerberos file in /tmp is expected
def set_spark(spark_conf=None, view_ui=False, debug=False):
"""Set/Get SparkContext and Spark Object.
If a SparkContext already exists, this is returned.
Otherwise a new SparkContext is created using the defined SparkConf
Some parameters of the SparkConf are forced to be compatible with the
CERN Spark Cluster (such as ports)
"""
if spark_conf is None:
spark_conf = SparkConf()
# spark_conf.set('spark.driver.memory', '8g')
# spark_conf.set('spark.executor.memory', '2g')
# spark_conf.set('spark.python.worker.memory', '1g')
# Prasanth ADVICE
'''
spark_conf.set('spark.executor.memory', '8g')
spark_conf.set('spark.executor.cores', 4)
spark_conf.set('spark.sql.shuffle.partitions', 200)
'''
spark_conf.set('spark.sql.shuffle.partitions', 200)
spark_conf.set('spark.driver.maxResultSize', 0)
# spark_conf.set('spark.dynamicAllocation.enabled', False)
......@@ -89,48 +100,19 @@ def set_spark(spark_conf=None, view_ui=False):
# spark_conf.set('spark.driver.extraClassPath', extra_class)
# spark_conf.set('spark.driver.extraClassPath','/eos/project/s/swan/public/hadoop-mapreduce-client-core-2.6.0-cdh5.7.6.jar')
# Prasanth ADVICE
'''
spark_conf.set('spark.executor.memory', '8g')
spark_conf.set('spark.executor.cores', 4)
spark_conf.set('spark.sql.shuffle.partitions', 200)
'''
# Empirically working for 5 days of data for cells
# with more than ca 75 hosts
# spark_conf.set('spark.driver.memory', '8g')
# spark_conf.set('spark.executor.memory', '8g')
# spark_conf.set('spark.python.worker.memory', '6g')
# spark_conf.set('spark.sql.shuffle.partitions', 600)
# spark_conf.set('spark.yarn.web-proxy.address', '172.17.0.1')
# spark_conf.set('spark.web-proxy.address', '172.17.0.1')
# spark_conf.set('spark.sql.shuffle.partitions', 30)
# java empty result error (with 200 partitions and 1 week of data)
# solution: decrease the nr of partitions
# per qualche strano motivo le partizioni erano troppe, e quindi finiva
# con lo scrivere partizioni vuote, e questo lo mandava in crash, cercando
# di fare pop da una coda che era empty. Abbassando il numero di partizioni
# sembra funzionare. Per 5-6 giorni di dati -> 10 partizioni. Non so bene
# che relazione serve perchè aumentando i giorni poi va in errore di troppa
# memoria.. è un po' oscura la faccenda...
# spark_conf.set('spark.driver.maxResultSize', 0)
# spark_conf.set('spark.storage.memoryFraction', 0.1)
print(functools.reduce(lambda x, y:
"%s\n%s\t%s" % (x, y[0], y[1]),
spark_conf.getAll(),
"Configuration data:"))
sc = SparkContext.getOrCreate(spark_conf)
spark = SparkSession(sc)
print(functools.reduce(lambda x, y:
"%s\n%s:\t%s" % (x, y[0], y[1]),
spark.sparkContext.getConf().getAll(),
"Configuration data after setup:"))
print(spark.sparkContext.getConf().getAll())
if debug:
final_conf = spark.sparkContext.getConf().getAll()
custom_conf = dict(spark_conf.getAll())
print(functools.reduce(lambda x, y:
"%s\n%50s: %-150s | %-200s" % \
(x, y[0], y[1], custom_conf[y[0]] \
if (y[0] in custom_conf.keys() and custom_conf[y[0]]!=y[1]) \
else ('same' if (y[0] in custom_conf.keys()) else '-') ),
final_conf,
"\nSparkConf of this Spark session Vs Custom Conf passed (after `|` ):\n"))
return sc, spark, spark_conf
......
---
history_steps: 48
slide_steps: 48
algo_and_params:
IForest_CI_TEST:
import_path: pyod.models.iforest.IForest
family: Traditional
train_on_test: False
subsample_for_train: 1000
parameters:
n_estimators: 100
max_samples_for_train: 1000
max_samples_for_train_deep: 10000
random_seed: 42
folder_training_time: "/eos/project-i/it-cloud-data-analytics/CI_TESTS/time/"
local_scores_folder: "/eos/project-i/it-cloud-data-analytics/CI_TESTS/scores/"
publish_per_windows: 4
hostgroup_abs_path: "cloud_compute/level2/batch/gva_project_013"
start_benchmark: "2021-01-01_00:00:00"
end_benchmark: "2021-01-20_00:00:00"
evaluation_artifact_path: "/eos/project-i/it-cloud-data-analytics/CI_TESTS/results/"
...
\ No newline at end of file
......@@ -14,7 +14,7 @@
"slide_steps": 48,
"future_steps": 0,
"date_start": "2021-01-10",
"date_end_excluded": "2021-01-17",
"date_end_excluded": "2021-01-11",
"date_start_normalization": "2021-01-03",
"date_end_normalization_excluded": "2021-01-10",
"selected_plugins": {
......
......@@ -56,26 +56,40 @@ else
fi
echo -e "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
echo -e "\ntest data_mining normalizationpresence train\n"
echo -e "\ntest data_mining normalization_presence train\n"
echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
data_mining normalization_presence --resource_file /work/tests/adcern/integration/adcern_cfg_train.json || echo 'Normalization is not present'
#fail 'test data_mining normalizationpresence'
#fail 'test data_mining normalization_presence'
echo -e "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
echo -e "\ntest data_mining computenormalization train\n"
echo -e "\ntest data_mining compute_normalization train\n"
echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
data_mining compute_normalization --resource_file /work/tests/adcern/integration/adcern_cfg_train.json || fail 'test data_mining computenormalization'
data_mining compute_normalization --resource_file /work/tests/adcern/integration/adcern_cfg_train.json || fail 'test data_mining compute_normalization'
echo -e "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
echo -e "\ntest data_mining downloaddata train\n"
echo -e "\ntest data_mining transform_data train\n"
echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
data_mining download_data --resource_file /work/tests/adcern/integration/adcern_cfg_train.json || fail 'test data_mining downloaddata'
data_mining transform_data --resource_file /work/tests/adcern/integration/adcern_cfg_train.json || fail 'test data_mining transform_data'
echo -e "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
echo -e "\ntest data_mining downloaddata test\n"
echo -e "\ntest data_mining transform_data inference\n"
echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
data_mining download_data --resource_file /work/tests/adcern/integration/adcern_cfg_test.json || fail 'test data_mining downloaddata'
data_mining transform_data --resource_file /work/tests/adcern/integration/adcern_cfg_inference.json || fail 'test data_mining transform_data'
echo -e "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
echo -e "\ntest data_mining analysis\n"
echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
data_mining analysis --alias_name IForest_CI_TEST \
--class_name IForest \
--module_name pyod.models.iforest \
--analysis_path /work/tests/adcern/integration/adcern_cfg_experiment.yaml \
--train_path /work/tests/adcern/integration/adcern_cfg_train.json \
--test_path /work/tests/adcern/integration/adcern_cfg_inference.json \
--subsample_for_train 1000 \
--train_on_test False \
--hyperparameters "{'n_estimators': 100}" || fail 'test data_mining analysis'
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment