Commit 0db57ec9 authored by Domenico Giordano's avatar Domenico Giordano
Browse files

Merge branch 'qa-v0.4' into stiv_notebook

parents d1e2cf93 39b8e883
......@@ -44,4 +44,7 @@ This is a guide map of the repository:
## From where to start
Detail procedures for newcomers (W.I.P.)
1. For a general introduction on this activity see the [ITTF seminar](https://indico.cern.ch/event/1012703/)
1. For interactive examples see [examples](examples)
1. For Airflow deployment see [control_room](control_room))
......@@ -402,7 +402,7 @@ def data_presence(resource_file):
@cli.command()
@click.option('--resource_file', default="",
help="""path to json file defining what to download.""")
def download_data(resource_file):
def transform_data(resource_file):
# PREPARE SPARK
sc, spark, conf = spark_preparation()
# READ RESOURCE FILE
......@@ -422,8 +422,8 @@ def download_data(resource_file):
@cli.command()
@click.option('--resource_file', default="",
help="""path to json file defining what to cache.""")
def cache_locally(resource_file):
"""Cache your data locally (aka move them from spark to local disk)."""
def copy_locally(resource_file):
"""Copy your data locally (aka move them from spark to local disk)."""
# PREPARE SPARK
sc, spark, conf = spark_preparation()
# READ RESOURCE FILE
......@@ -638,24 +638,23 @@ def analysis(module_name, class_name, alias_name, hyperparameters,
with open(file_path_config_train) as json_file:
data_dict_train = json.load(json_file)
# connect to the db
conn = sqlite3.connect(folder_training_time + '/time.db', timeout = 120)
c = conn.cursor()
# ensure the table is there
c.execute('''CREATE TABLE IF NOT EXISTS time
(date_start text, date_end_excluded text,
long_algo_description text,
training_time real, measurement_time text,
PRIMARY KEY (date_start, date_end_excluded,
long_algo_description,
measurement_time))''')
conn.commit()
c.execute('''INSERT INTO time
conn = sqlite3.connect(folder_training_time + '/time.db', timeout=120)
modify_db(conn, '''CREATE TABLE IF NOT EXISTS time
(date_start text, date_end_excluded text,
long_algo_description text,
training_time real, measurement_time text,
PRIMARY KEY (date_start, date_end_excluded,
long_algo_description,
measurement_time))''')
modify_db(conn, '''INSERT INTO time
VALUES (?, ?, ?, ?, datetime('now', 'localtime'))''',
[data_dict_train["date_start"],
data_dict_train["date_end_excluded"],
algo_name,
training_time])
conn.commit()
[data_dict_train["date_start"],
data_dict_train["date_end_excluded"],
algo_name,
training_time])
conn.close()
# with open(file_path_config_train) as json_file:
# data_dict_train = json.load(json_file)
......
import sqlite3
import sqlite3
import time
def modify_db(conn,
query,
......@@ -16,21 +17,30 @@ def modify_db(conn,
args: array
additional arguments for execute query, optional
"""
with conn.cursor() as c:
if args:
c.execute(query,args)
else:
c.execute(query)
with conn.cursor() as c:
for x in range(1, 11):
try:
if args:
c.execute(query, args)
else:
c.execute(query)
except:
print("Sqlite3 execute unsuccessful, retrying after %s sec...." %x)
time.sleep(x)
pass
else:
print("Sqlite3 execute successful, breaking the retry cycle.")
break
# retry commit - db might be locked by different process
for x in range(0, 10):
for x in range(1, 11):
try:
conn.commit()
except:
print("Commit to sqlite unsuccessful, retrying....")
time.sleep(1)
print("Sqlite3 commit unsuccessful, retrying after %s sec...." %x)
time.sleep(x)
pass
finally:
else:
print("Sqlite3 commit successful, breaking the retry cycle.")
break
\ No newline at end of file
......@@ -102,27 +102,27 @@ def ad_etl_dag(dag_id='give_me_a_name', override_params={}):
)
ad_tasks = [
('local_data_presence', 'data_mining data_presence --resource_file ad_config_file.json', 'all_success'),
('normalization_presence', 'data_mining normalization_presence --resource_file ad_config_file.json', 'all_failed'),
('compute_normalization' , 'data_mining compute_normalization --resource_file ad_config_file.json', 'all_failed'),
('spark_etl_to_hdfs', 'data_mining download_data --resource_file ad_config_file.json', 'one_success'),
('from_hdfs_to_local', 'data_mining cache_locally --resource_file ad_config_file.json', 'all_success'),
('check_local_data_presence', 'data_mining data_presence --resource_file ad_config_file.json', 'all_success'),
('spark_normalization_presence', 'data_mining normalization_presence --resource_file ad_config_file.json', 'all_failed'),
('spark_compute_normalization' , 'data_mining compute_normalization --resource_file ad_config_file.json', 'all_failed'),
('spark_transform_data', 'data_mining transform_data --resource_file ad_config_file.json', 'one_success'),
('spark_mv_data_to_local', 'data_mining copy_locally --resource_file ad_config_file.json', 'all_success'),
]
for atask in ad_tasks:
globals()[atask[0]] = return_configured_BashOperator(*atask)
# Start checking the local data presence and in case break pipeline
local_data_presence >> dag_exit_status
check_local_data_presence >> dag_exit_status
# Otherwise if datapresence fails, check the normalization presence
local_data_presence >> normalization_presence
check_local_data_presence >> spark_normalization_presence
# if missing the normalization compute it and then download data
normalization_presence >> compute_normalization >> spark_etl_to_hdfs
spark_normalization_presence >> spark_compute_normalization >> spark_transform_data
# if normalization presence succeeds start immediately downloading data
normalization_presence >> spark_etl_to_hdfs
# Finally cache data locally (#FIXME what does it mean?)
spark_etl_to_hdfs >> from_hdfs_to_local
spark_normalization_presence >> spark_transform_data
# Finally move data to local folder (can also be eos)
spark_transform_data >> spark_mv_data_to_local
from_hdfs_to_local >> dag_exit_status
spark_mv_data_to_local >> dag_exit_status
return dag
......@@ -36,16 +36,27 @@ TIMESTAMP_TO_CONSIDER = 'event_timestamp'
# SPARK MANAGEMENT -----------------------------------------------------------
def set_spark(spark_conf=None, view_ui=False):
"""Set Spark.
NB: kerberos file in /tmp is expected
def set_spark(spark_conf=None, view_ui=False, debug=False):
"""Set/Get SparkContext and Spark Object.
If a SparkContext already exists, this is returned.
Otherwise a new SparkContext is created using the defined SparkConf
Some parameters of the SparkConf are forced to be compatible with the
CERN Spark Cluster (such as ports)
"""
if spark_conf is None:
spark_conf = SparkConf()
# spark_conf.set('spark.driver.memory', '8g')
# spark_conf.set('spark.executor.memory', '2g')
# spark_conf.set('spark.python.worker.memory', '1g')
# Prasanth ADVICE
'''
spark_conf.set('spark.executor.memory', '8g')
spark_conf.set('spark.executor.cores', 4)
spark_conf.set('spark.sql.shuffle.partitions', 200)
'''
spark_conf.set('spark.sql.shuffle.partitions', 200)
spark_conf.set('spark.driver.maxResultSize', 0)
# spark_conf.set('spark.dynamicAllocation.enabled', False)
......@@ -89,48 +100,19 @@ def set_spark(spark_conf=None, view_ui=False):
# spark_conf.set('spark.driver.extraClassPath', extra_class)
# spark_conf.set('spark.driver.extraClassPath','/eos/project/s/swan/public/hadoop-mapreduce-client-core-2.6.0-cdh5.7.6.jar')
# Prasanth ADVICE
'''
spark_conf.set('spark.executor.memory', '8g')
spark_conf.set('spark.executor.cores', 4)
spark_conf.set('spark.sql.shuffle.partitions', 200)
'''
# Empirically working for 5 days of data for cells
# with more than ca 75 hosts
# spark_conf.set('spark.driver.memory', '8g')
# spark_conf.set('spark.executor.memory', '8g')
# spark_conf.set('spark.python.worker.memory', '6g')
# spark_conf.set('spark.sql.shuffle.partitions', 600)
# spark_conf.set('spark.yarn.web-proxy.address', '172.17.0.1')
# spark_conf.set('spark.web-proxy.address', '172.17.0.1')
# spark_conf.set('spark.sql.shuffle.partitions', 30)
# java empty result error (with 200 partitions and 1 week of data)
# solution: decrease the nr of partitions
# per qualche strano motivo le partizioni erano troppe, e quindi finiva
# con lo scrivere partizioni vuote, e questo lo mandava in crash, cercando
# di fare pop da una coda che era empty. Abbassando il numero di partizioni
# sembra funzionare. Per 5-6 giorni di dati -> 10 partizioni. Non so bene
# che relazione serve perchè aumentando i giorni poi va in errore di troppa
# memoria.. è un po' oscura la faccenda...
# spark_conf.set('spark.driver.maxResultSize', 0)
# spark_conf.set('spark.storage.memoryFraction', 0.1)
print(functools.reduce(lambda x, y:
"%s\n%s\t%s" % (x, y[0], y[1]),
spark_conf.getAll(),
"Configuration data:"))
sc = SparkContext.getOrCreate(spark_conf)
spark = SparkSession(sc)
print(functools.reduce(lambda x, y:
"%s\n%s:\t%s" % (x, y[0], y[1]),
spark.sparkContext.getConf().getAll(),
"Configuration data after setup:"))
print(spark.sparkContext.getConf().getAll())
if debug:
final_conf = spark.sparkContext.getConf().getAll()
custom_conf = dict(spark_conf.getAll())
print(functools.reduce(lambda x, y:
"%s\n%50s: %-150s | %-200s" % \
(x, y[0], y[1], custom_conf[y[0]] \
if (y[0] in custom_conf.keys() and custom_conf[y[0]]!=y[1]) \
else ('same' if (y[0] in custom_conf.keys()) else '-') ),
final_conf,
"\nSparkConf of this Spark session Vs Custom Conf passed (after `|` ):\n"))
return sc, spark, spark_conf
......
---
history_steps: 48
slide_steps: 48
algo_and_params:
IForest_CI_TEST:
import_path: pyod.models.iforest.IForest
family: Traditional
train_on_test: False
subsample_for_train: 1000
parameters:
n_estimators: 100
max_samples_for_train: 1000
max_samples_for_train_deep: 10000
random_seed: 42
folder_training_time: "/eos/project-i/it-cloud-data-analytics/CI_TESTS/time/"
local_scores_folder: "/eos/project-i/it-cloud-data-analytics/CI_TESTS/scores/"
publish_per_windows: 4
hostgroup_abs_path: "cloud_compute/level2/batch/gva_project_013"
start_benchmark: "2021-01-01_00:00:00"
end_benchmark: "2021-01-20_00:00:00"
evaluation_artifact_path: "/eos/project-i/it-cloud-data-analytics/CI_TESTS/results/"
...
\ No newline at end of file
......@@ -14,7 +14,7 @@
"slide_steps": 48,
"future_steps": 0,
"date_start": "2021-01-10",
"date_end_excluded": "2021-01-17",
"date_end_excluded": "2021-01-11",
"date_start_normalization": "2021-01-03",
"date_end_normalization_excluded": "2021-01-10",
"selected_plugins": {
......
......@@ -56,26 +56,40 @@ else
fi
echo -e "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
echo -e "\ntest data_mining normalizationpresence train\n"
echo -e "\ntest data_mining normalization_presence train\n"
echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
data_mining normalization_presence --resource_file /work/tests/adcern/integration/adcern_cfg_train.json || echo 'Normalization is not present'
#fail 'test data_mining normalizationpresence'
#fail 'test data_mining normalization_presence'
echo -e "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
echo -e "\ntest data_mining computenormalization train\n"
echo -e "\ntest data_mining compute_normalization train\n"
echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
data_mining compute_normalization --resource_file /work/tests/adcern/integration/adcern_cfg_train.json || fail 'test data_mining computenormalization'
data_mining compute_normalization --resource_file /work/tests/adcern/integration/adcern_cfg_train.json || fail 'test data_mining compute_normalization'
echo -e "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
echo -e "\ntest data_mining downloaddata train\n"
echo -e "\ntest data_mining transform_data train\n"
echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
data_mining download_data --resource_file /work/tests/adcern/integration/adcern_cfg_train.json || fail 'test data_mining downloaddata'
data_mining transform_data --resource_file /work/tests/adcern/integration/adcern_cfg_train.json || fail 'test data_mining transform_data'
echo -e "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
echo -e "\ntest data_mining downloaddata test\n"
echo -e "\ntest data_mining transform_data inference\n"
echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
data_mining download_data --resource_file /work/tests/adcern/integration/adcern_cfg_test.json || fail 'test data_mining downloaddata'
data_mining transform_data --resource_file /work/tests/adcern/integration/adcern_cfg_inference.json || fail 'test data_mining transform_data'
echo -e "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
echo -e "\ntest data_mining analysis\n"
echo -e "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
data_mining analysis --alias_name IForest_CI_TEST \
--class_name IForest \
--module_name pyod.models.iforest \
--analysis_path /work/tests/adcern/integration/adcern_cfg_experiment.yaml \
--train_path /work/tests/adcern/integration/adcern_cfg_train.json \
--test_path /work/tests/adcern/integration/adcern_cfg_inference.json \
--subsample_for_train 1000 \
--train_on_test False \
--hyperparameters "{'n_estimators': 100}" || fail 'test data_mining analysis'
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment