Commit a2a146fc authored by smetaj's avatar smetaj
Browse files

Merge branch 'qa-v0.4_stiven' of...

Merge branch 'qa-v0.4_stiven' of ssh://gitlab.cern.ch:7999/cloud-infrastructure/data-analytics into qa-v0.4_stiven

Just called git pull origin qa-v0.4_stiven
parents a34b135e 0db57ec9
......@@ -216,14 +216,14 @@ spark_etl: &template_test_spark_etl
rules:
- if: '$CI_COMMIT_BRANCH =~ /^qa.*$/ || $CI_COMMIT_TAG =~ /^v.*$/'
script:
- . ${CI_PROJECT_DIR}/tests/spark_etl/ci_test_script.sh
- . ${CI_PROJECT_DIR}/tests/spark_etl/integration/ci_test_script.sh
- start_docker_compose
after_script:
- . ${CI_PROJECT_DIR}/tests/spark_etl/ci_test_script.sh
- . ${CI_PROJECT_DIR}/tests/spark_etl/integration/ci_test_script.sh
- stop_docker_compose
artifacts:
paths:
- $CI_PROJECT_DIR/tests/spark_etl/*
- $CI_PROJECT_DIR/tests/spark_etl/integration/*
expire_in: 1 week
when: always
......
......@@ -44,4 +44,7 @@ This is a guide map of the repository:
## From where to start
Detail procedures for newcomers (W.I.P.)
1. For a general introduction on this activity see the [ITTF seminar](https://indico.cern.ch/event/1012703/)
1. For interactive examples see [examples](examples)
1. For Airflow deployment see [control_room](control_room))
......@@ -11,6 +11,7 @@ import pandas as pd
from adcern.publisher import generate_result
from etl.spark_etl.utils import read_json_config
from adcern.publisher import create_id_for_hostrgroups_and_plugins_start_end
from adcern.sqlite3_backend import modify_db
from sklearn.preprocessing import RobustScaler
import sqlite3
......@@ -364,25 +365,22 @@ class BaseOutlierAnalyser(ABC):
"end_window": str(int(end_window)),
"noramlization_id": str(normalization_id)})
# connect to the db
conn_score = sqlite3.connect(score_folder + '/scores.db', timeout=30)
with conn_score:
c = conn_score.cursor()
# ensure the table is there
c.execute('''CREATE TABLE IF NOT EXISTS scores
(hostgroup text, hostname text, algorithm text,
score real, end_window int, noramlization_id text,
PRIMARY KEY (hostgroup, hostname, algorithm,
end_window, noramlization_id))''')
# add row by row
num_rows = len(df)
for i in range(num_rows):
# Try inserting the row
row = df.iloc[i]
c.execute('''INSERT OR IGNORE INTO scores
VALUES (?, ?, ?, ?, ?, ?)''', row)
conn_score.commit()
conn_score = sqlite3.connect(score_folder + '/scores.db', timeout=120)
modify_db(conn_score, '''CREATE TABLE IF NOT EXISTS scores
(hostgroup text, hostname text, algorithm text,
score real, end_window int, noramlization_id text,
PRIMARY KEY (hostgroup, hostname, algorithm,
end_window, noramlization_id))''')
# add row by row
num_rows = len(df)
for i in range(num_rows):
# Try inserting the row
row = df.iloc[i]
modify_db(conn_score, '''INSERT OR IGNORE INTO scores
VALUES (?, ?, ?, ?, ?, ?)''', row)
conn_score.close()
......
......@@ -22,6 +22,7 @@ from hashlib import blake2b
os.environ['PYSPARK_PYTHON'] = sys.executable
from etl.spark_etl.utils import read_json_config # noqa
from adcern.publisher import create_id_for_hostrgroups_and_plugins_start_end # noqa
from adcern.sqlite3_backend import modify_db
# TRADITIONAL
# from pyod.models.iforest import IForest # noqa
......@@ -227,27 +228,23 @@ def save_scores_local_sqlite(analyser,
# connect to the db
conn_score = \
sqlite3.connect(score_folder
+ '/scores_' + algorithm_name + '.db', timeout=30)
with conn_score:
c = conn_score.cursor()
# ensure the table is there
c.execute('''CREATE TABLE IF NOT EXISTS scores
(hostgroup text, hostname text, algorithm text,
score real, end_window int, noramlization_id text,
PRIMARY KEY (hostgroup, hostname, algorithm, end_window,
noramlization_id))''')
conn_score.commit()
# add row by row
num_rows = len(df)
for i in range(num_rows):
# Try inserting the row
row = df.iloc[i]
c.execute('''INSERT OR IGNORE INTO scores
VALUES (?, ?, ?, ?, ?, ?)''', row)
conn_score.commit()
conn_score.commit()
+ '/scores_' + algorithm_name + '.db', timeout=120)
# ensure the table is there
modify_db(conn_score, '''CREATE TABLE IF NOT EXISTS scores
(hostgroup text, hostname text, algorithm text,
score real, end_window int, noramlization_id text,
PRIMARY KEY (hostgroup, hostname, algorithm, end_window,
noramlization_id))''')
# add row by row
num_rows = len(df)
for i in range(num_rows):
# Try inserting the row
row = df.iloc[i]
modify_db(conn_score, '''INSERT OR IGNORE INTO scores
VALUES (?, ?, ?, ?, ?, ?)''', row)
conn_score.close()
......@@ -405,7 +402,7 @@ def data_presence(resource_file):
@cli.command()
@click.option('--resource_file', default="",
help="""path to json file defining what to download.""")
def download_data(resource_file):
def transform_data(resource_file):
# PREPARE SPARK
sc, spark, conf = spark_preparation()
# READ RESOURCE FILE
......@@ -425,8 +422,8 @@ def download_data(resource_file):
@cli.command()
@click.option('--resource_file', default="",
help="""path to json file defining what to cache.""")
def cache_locally(resource_file):
"""Cache your data locally (aka move them from spark to local disk)."""
def copy_locally(resource_file):
"""Copy your data locally (aka move them from spark to local disk)."""
# PREPARE SPARK
sc, spark, conf = spark_preparation()
# READ RESOURCE FILE
......@@ -641,24 +638,23 @@ def analysis(module_name, class_name, alias_name, hyperparameters,
with open(file_path_config_train) as json_file:
data_dict_train = json.load(json_file)
# connect to the db
conn = sqlite3.connect(folder_training_time + '/time.db')
c = conn.cursor()
# ensure the table is there
c.execute('''CREATE TABLE IF NOT EXISTS time
(date_start text, date_end_excluded text,
long_algo_description text,
training_time real, measurement_time text,
PRIMARY KEY (date_start, date_end_excluded,
long_algo_description,
measurement_time))''')
conn.commit()
c.execute('''INSERT INTO time
conn = sqlite3.connect(folder_training_time + '/time.db', timeout=120)
modify_db(conn, '''CREATE TABLE IF NOT EXISTS time
(date_start text, date_end_excluded text,
long_algo_description text,
training_time real, measurement_time text,
PRIMARY KEY (date_start, date_end_excluded,
long_algo_description,
measurement_time))''')
modify_db(conn, '''INSERT INTO time
VALUES (?, ?, ?, ?, datetime('now', 'localtime'))''',
[data_dict_train["date_start"],
data_dict_train["date_end_excluded"],
algo_name,
training_time])
conn.commit()
[data_dict_train["date_start"],
data_dict_train["date_end_excluded"],
algo_name,
training_time])
conn.close()
# with open(file_path_config_train) as json_file:
# data_dict_train = json.load(json_file)
......
......@@ -17,6 +17,7 @@ import sklearn.metrics
from miscellanea.experiment_comparison import plot_roc # noqa
from sklearn.metrics import precision_recall_curve # noqa
import sklearn
from adcern.sqlite3_backend import modify_db
# example of start and end of week
......@@ -357,16 +358,14 @@ def score_benchmark(folder_scores, hostgroup,
aucs_weeks = []
# connect to the db
conn_score = sqlite3.connect(labels_folder + '/week_metrics.db',
timeout=30)
with conn_score:
c = conn_score.cursor()
# ensure the table is there
c.execute('''CREATE TABLE IF NOT EXISTS auc
(hostgroup text, algorithm text, family text,
auc_score real, week_index int, end_week int,
PRIMARY KEY
(hostgroup, algorithm, end_week))''')
conn_score.commit()
timeout=120)
# ensure the table is there
modify_db(conn_score, '''CREATE TABLE IF NOT EXISTS auc
(hostgroup text, algorithm text, family text,
auc_score real, week_index int, end_week int,
PRIMARY KEY
(hostgroup, algorithm, end_week))''')
# FOR EVERY WEEK -------------------------------------------------------
for w in sorted(weeks_available):
print("WEEK: ", w, end=" - ")
......@@ -385,10 +384,9 @@ def score_benchmark(folder_scores, hostgroup,
# ax=dict_ax_rocs[algo_name], alpha=0.2)
print("AUC: ", roc_auc)
aucs_weeks.append(roc_auc)
c.execute('''INSERT OR IGNORE INTO auc
modify_db(conn_score, '''INSERT OR IGNORE INTO auc
VALUES (?, ?, ?, ?, ?, ?)''',
(hostgroup, algo_name, family, roc_auc, int(w), end_week))
conn_score.commit()
conn_score.close()
# # CUMULATIVE QUANTITIES
......@@ -457,7 +455,7 @@ def score_benchmark(folder_scores, hostgroup,
def visualize_auc(hostgroup, input_folder, output_folder):
"""Visualize the AUC results form the selected algo."""
# read the database of scores in a pandas DF
conn = sqlite3.connect(input_folder + "/week_metrics.db")
conn = sqlite3.connect(input_folder + "/week_metrics.db", timeout = 120)
df_week_auc = pd.read_sql_query(
"SELECT * FROM auc WHERE hostgroup='{}'".format(hostgroup), conn)
conn.close()
......
import sqlite3
import time
def modify_db(conn,
query,
*args):
"""Wrapper for modifications (INSERT, UPDATE, DELETE or REPLACE) of Sqlite3 database.
Executes given query and commits it - in case of lock it retries the commit
Params
------
connection: sqlite3 connection
existing sqlite3 connection to use
query: sql
query to run
args: array
additional arguments for execute query, optional
"""
with conn.cursor() as c:
for x in range(1, 11):
try:
if args:
c.execute(query, args)
else:
c.execute(query)
except:
print("Sqlite3 execute unsuccessful, retrying after %s sec...." %x)
time.sleep(x)
pass
else:
print("Sqlite3 execute successful, breaking the retry cycle.")
break
# retry commit - db might be locked by different process
for x in range(1, 11):
try:
conn.commit()
except:
print("Sqlite3 commit unsuccessful, retrying after %s sec...." %x)
time.sleep(x)
pass
else:
print("Sqlite3 commit successful, breaking the retry cycle.")
break
......@@ -102,27 +102,27 @@ def ad_etl_dag(dag_id='give_me_a_name', override_params={}):
)
ad_tasks = [
('local_data_presence', 'data_mining data_presence --resource_file ad_config_file.json', 'all_success'),
('normalization_presence', 'data_mining normalization_presence --resource_file ad_config_file.json', 'all_failed'),
('compute_normalization' , 'data_mining compute_normalization --resource_file ad_config_file.json', 'all_failed'),
('spark_etl_to_hdfs', 'data_mining download_data --resource_file ad_config_file.json', 'one_success'),
('from_hdfs_to_local', 'data_mining cache_locally --resource_file ad_config_file.json', 'all_success'),
('check_local_data_presence', 'data_mining data_presence --resource_file ad_config_file.json', 'all_success'),
('spark_normalization_presence', 'data_mining normalization_presence --resource_file ad_config_file.json', 'all_failed'),
('spark_compute_normalization' , 'data_mining compute_normalization --resource_file ad_config_file.json', 'all_failed'),
('spark_transform_data', 'data_mining transform_data --resource_file ad_config_file.json', 'one_success'),
('spark_mv_data_to_local', 'data_mining copy_locally --resource_file ad_config_file.json', 'all_success'),
]
for atask in ad_tasks:
globals()[atask[0]] = return_configured_BashOperator(*atask)
# Start checking the local data presence and in case break pipeline
local_data_presence >> dag_exit_status
check_local_data_presence >> dag_exit_status
# Otherwise if datapresence fails, check the normalization presence
local_data_presence >> normalization_presence
check_local_data_presence >> spark_normalization_presence
# if missing the normalization compute it and then download data
normalization_presence >> compute_normalization >> spark_etl_to_hdfs
spark_normalization_presence >> spark_compute_normalization >> spark_transform_data
# if normalization presence succeeds start immediately downloading data
normalization_presence >> spark_etl_to_hdfs
# Finally cache data locally (#FIXME what does it mean?)
spark_etl_to_hdfs >> from_hdfs_to_local
spark_normalization_presence >> spark_transform_data
# Finally move data to local folder (can also be eos)
spark_transform_data >> spark_mv_data_to_local
from_hdfs_to_local >> dag_exit_status
spark_mv_data_to_local >> dag_exit_status
return dag
......@@ -36,16 +36,27 @@ TIMESTAMP_TO_CONSIDER = 'event_timestamp'
# SPARK MANAGEMENT -----------------------------------------------------------
def set_spark(spark_conf=None, view_ui=False):
"""Set Spark.
NB: kerberos file in /tmp is expected
def set_spark(spark_conf=None, view_ui=False, debug=False):
"""Set/Get SparkContext and Spark Object.
If a SparkContext already exists, this is returned.
Otherwise a new SparkContext is created using the defined SparkConf
Some parameters of the SparkConf are forced to be compatible with the
CERN Spark Cluster (such as ports)
"""
if spark_conf is None:
spark_conf = SparkConf()
# spark_conf.set('spark.driver.memory', '8g')
# spark_conf.set('spark.executor.memory', '2g')
# spark_conf.set('spark.python.worker.memory', '1g')
# Prasanth ADVICE
'''
spark_conf.set('spark.executor.memory', '8g')
spark_conf.set('spark.executor.cores', 4)
spark_conf.set('spark.sql.shuffle.partitions', 200)
'''
spark_conf.set('spark.sql.shuffle.partitions', 200)
spark_conf.set('spark.driver.maxResultSize', 0)
# spark_conf.set('spark.dynamicAllocation.enabled', False)
......@@ -89,48 +100,19 @@ def set_spark(spark_conf=None, view_ui=False):
# spark_conf.set('spark.driver.extraClassPath', extra_class)
# spark_conf.set('spark.driver.extraClassPath','/eos/project/s/swan/public/hadoop-mapreduce-client-core-2.6.0-cdh5.7.6.jar')
# Prasanth ADVICE
'''
spark_conf.set('spark.executor.memory', '8g')
spark_conf.set('spark.executor.cores', 4)
spark_conf.set('spark.sql.shuffle.partitions', 200)
'''
# Empirically working for 5 days of data for cells
# with more than ca 75 hosts
# spark_conf.set('spark.driver.memory', '8g')
# spark_conf.set('spark.executor.memory', '8g')
# spark_conf.set('spark.python.worker.memory', '6g')
# spark_conf.set('spark.sql.shuffle.partitions', 600)
# spark_conf.set('spark.yarn.web-proxy.address', '172.17.0.1')
# spark_conf.set('spark.web-proxy.address', '172.17.0.1')
# spark_conf.set('spark.sql.shuffle.partitions', 30)
# java empty result error (with 200 partitions and 1 week of data)
# solution: decrease the nr of partitions
# per qualche strano motivo le partizioni erano troppe, e quindi finiva
# con lo scrivere partizioni vuote, e questo lo mandava in crash, cercando
# di fare pop da una coda che era empty. Abbassando il numero di partizioni
# sembra funzionare. Per 5-6 giorni di dati -> 10 partizioni. Non so bene
# che relazione serve perchè aumentando i giorni poi va in errore di troppa
# memoria.. è un po' oscura la faccenda...
# spark_conf.set('spark.driver.maxResultSize', 0)
# spark_conf.set('spark.storage.memoryFraction', 0.1)
print(functools.reduce(lambda x, y:
"%s\n%s\t%s" % (x, y[0], y[1]),
spark_conf.getAll(),
"Configuration data:"))
sc = SparkContext(conf=spark_conf)
sc = SparkContext.getOrCreate(spark_conf)
spark = SparkSession(sc)
print(functools.reduce(lambda x, y:
"%s\n%s:\t%s" % (x, y[0], y[1]),
spark.sparkContext.getConf().getAll(),
"Configuration data after setup:"))
print(spark.sparkContext.getConf().getAll())
if debug:
final_conf = spark.sparkContext.getConf().getAll()
custom_conf = dict(spark_conf.getAll())
print(functools.reduce(lambda x, y:
"%s\n%50s: %-150s | %-200s" % \
(x, y[0], y[1], custom_conf[y[0]] \
if (y[0] in custom_conf.keys() and custom_conf[y[0]]!=y[1]) \
else ('same' if (y[0] in custom_conf.keys()) else '-') ),
final_conf,
"\nSparkConf of this Spark session Vs Custom Conf passed (after `|` ):\n"))
return sc, spark, spark_conf
......
This diff is collapsed.
# Collection of interactive examples
The examples here mainly target interactive work via jupyter notebooks and/or Swan.
The same procedures can run in batch mode without jupyter notebooks.
For this alternative approach the CI examples in [tests/adcern](../tests/adcern) and [tests/spark_etl](../tests/spark_etl) can be inspected.
## Interactive example with Swan-Spark
The notebook [example_spark_etl_via_swan.ipynb](example_spark_etl_via_swan.ipynb) gives an example about how to interact with Spark using Swan.
It shows how to
- install the spark_etl libs
- authenticate to Spark
- run data extraction for a set of Rally data
## Interactive run of Anomaly Detection pipelines
The notebook [AD_system_demo_1.ipynb](AD_system_demo_1.ipynb) shows the major steps needed to extract Collectd data from HDFS
and organize them in Pandas dataframes. The dataframes are then analysed with various algorithms for outlier detection.
All the procedures use methods implemented in the [adcern](../adcern) lib of this repo.
The notebook can be run with Swan, so that the configuration needed to access the Analytix cluster is centrally solved by the Swan setup.
The same notebook can be run in a dedicated container, based on the image sparknotebook that is created in this project and distributed in the gitlab registry of this project. This image contains already all needed libs, including jupyter installation.
%% Cell type:markdown id: tags:
# Test access of Spark via pySPark
%% Cell type:markdown id: tags:
This notebooks installs the data-analytics package
and tests the basic functionalities
In order to run it in Swan, follow those steps
1) pass your kerberos credentials
2) install the package, using a specific tag (in the example is qa)
3) run
In order to run it in Swan, follow those steps<br>
1. Start the swan-spark cluster connection
- the Star icon in the tool bar
1. Install the data-analytics package, using a specific tag of the repository (in the example is v0.4rc5)
- NB. this could require up to 10 mins, be patient. It should be done only once, if not already done
1. Run the rest of the notebook
- To extract data from HDFS MONIT and build a derived dataframe
%% Cell type:code id: tags:
``` python
import getpass
import os, sys
os.environ['PYTHONPATH']=os.environ['HOME']+'.local/lib/python3.6/site-packages/:'+os.environ['PYTHONPATH']
```
%% Cell type:markdown id: tags:
## Start Spark (click the "star")
%% Cell type:markdown id: tags:
## Install the package (if not done already)
%% Cell type:code id: tags:
``` python
%%bash
#install_branch=qa
#pip3 install --user git+https://:@gitlab.cern.ch:8443/cloud-infrastructure/data-analytics.git@${install_branch}
!pip3 install --user git+https://:@gitlab.cern.ch:8443/cloud-infrastructure/data-analytics.git
```
%% Cell type:markdown id: tags:
%% Cell type:code id: tags:
# Test package
``` python
# Extend PATH
os.environ['PATH']=os.environ['HOME']+'.local/bin:'+os.environ['PATH']
os.environ['PATH']
```
%% Cell type:code id: tags:
``` python
from etl.spark_etl import cluster_utils
# Let's verify that the version is still 3.8
!python -V
```
%% Cell type:code id: tags:
``` python
swan_spark_conf.getAll()
os.environ['PYTHONPATH']=os.environ['HOME']+'.local/lib/python3.8/site-packages:'+os.environ['PYTHONPATH']
# Extend PYTHONPATH
os.environ['PYTHONPATH']
```
%% Cell type:markdown id: tags:
# Access spark using cluster_utils lib
%% Cell type:code id: tags:
``` python
# Test connection to Spark
#sc, spark, conf = cluster_utils.set_spark(swan_spark_conf)
# NB: if the library doesn't get loaded
# at the first time after installation
# please restart your notebook
from etl.spark_etl import cluster_utils
```
%% Cell type:code id: tags:
``` python
sc
# This spark config comes from Swan (did you start the cluster pushing on the Star button?!)
swan_spark_conf.getAll()
```
%% Cell type:code id: tags:
``` python
spark
# Let's access Spark, spark context and configuration (the one above)
sc, spark, conf = cluster_utils.set_spark(swan_spark_conf)
```
%% Cell type:code id: tags:
``` python
# Test stopping spark session
#cluster_utils.stop_spark(sc,spark)
sc
```
%% Cell type:code id: tags:
``` python
# Start again the spark session
#sc, spark, conf = cluster_utils.set_spark()
spark
```
%% Cell type:markdown id: tags:
# Test data extraction
%% Cell type:markdown id: tags:
In this example access the rally data, extract a subset of data
and store the results in a different outpath (a file per day)
%% Cell type:code id: tags:
``` python
#Configuration PARAMS
#Define configuration parameters
#output dir in user HDFS area
outbasepath="test_rally_errors"
#output base file name
outbasefile="rally_errors.parquet"
#input file path with data to process with spark
inbasepath="/project/monitoring/archive/openstack/logs/generic/rallytester/2021/0*/01"
inbasepath="/project/monitoring/archive/openstack/logs/generic/rallytester/2021/*/01"
#schema file
schemafile='rally_schema.json'
```
%% Cell type:code id: tags:
``` python
!hdfs dfs -ls /project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01
```
%% Cell type:code id: tags:
``` python
cluster_utils.get_list_dirs('/project/monitoring/archive/openstack/logs/generic/rallytester/2021/02/01')
```
%% Cell type:code id: tags: