Commit c99f06c5 authored by Matteo Paltenghi's avatar Matteo Paltenghi
Browse files

demo dag added

parent ed5de672
......@@ -16,7 +16,7 @@ This time series data can come from:
A central part of this project is the package that will contains a different algorithms to analyze the time series data and raise alarms if needed.
## Easy Install Procedure
## Installation Procedure
Start the VM with the following port forwarding:
```
......@@ -65,67 +65,16 @@ and comment the developement section:
# IMAGE_NAME = "..."
```
## Getting Started
You can use this module in two environments:
1. in the SWAN notebook
1. in your local machine
1. in a [CERN VM for Automatic Anomaly Detection](documentation/ad-pipeline-airflow.md)
## Getting started with your first Anomaly Detection DAG
### Prerequisites
Now that Airflow is up and running we can test the anomaly detection system and
its algorithms on a demo scenario.
You need to:
1. be able to authenticate via kerberos for the Spark access
```
kinit username@CERN.CH
```
2. have a Grafana token, to access the InfluxDB and ElasticSearch data.
### Installing - SWAN (https://swan.web.cern.ch/)
1. Make sure you start Swan connecting to the right Spark cluster, that in general is `the General Purpose (Analytix)`
2. Paste this in you notebook:
``` python
# necessary to download the authenticate
import getpass
import os, sys
print("Please enter your kerberos password")
ret = os.system("echo \"%s\" | kinit" % getpass.getpass())
if ret == 0: print("Credentials created successfully")
else: sys.stderr.write('Error creating credentials, return code: %s\n' % ret)
```
2. Execute the cell and insert your passwork. Now you are logged in.
3. Paste this and execute:
```
! pip install --user git+https://:@gitlab.cern.ch:8443/cloud-infrastructure/data-analytics.git
```
4. Instal the package that you want (e.g):
``` python
from etl.spark_etl import cluster_utils
```
### Installing - Local machine
1. You can directly login to your shell with the command:
```
kinit username@CERN.CH
```
2. Insert you pasword.
3. Directly install the package from gitlab
```
! pip install --user git+https://:@gitlab.cern.ch:8443/cloud-infrastructure/data-analytics.git
```
## Possible use cases
![use-case-diagram](documentation/diagrams/use-case-data-analytics.png)
## Example Notebooks
You can refer to the [notebook folder](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/tree/master/notebooks) to explore some example of usage.
Follow these steps:
1. Open the Airflow UI: http://localhost:8080/
1. Search for the dag named **dag_ad_demo** and click on its name.
1. Click on the *graph view* tab to see the interconnection between different tasks
1. Click on the **on/off switch** nex to the header *DAG: dag_ad_demo*.
## Authors
Congratulation, now the demo DAG will start and if everything worked properly you will see all the tasks colored of dark green.
This project is created and mantained by the IT-CM-RPS team.
In particular the main active contributors are: Domenico, Patrycja, Matteo.
# https://marclamberti.com/blog/how-to-use-dockeroperator-apache-airflow/
from airflow import DAG
from datetime import datetime
from datetime import timedelta
from airflow.utils.dates import days_ago # noqa
from template_tasks import get_download_task
from template_tasks import get_mover_task
from template_tasks import get_single_algo
from template_tasks import get_configuration_task
# import an object that is
import os
default_args = {'owner': 'airflow',
'description': 'One analysis for every algo',
'depend_on_past': True,
'start_date': datetime(2020, 3, 1),
'end_date': datetime(2020, 3, 8),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)}
# Name the DAG and config file as this file
dag_identifier = os.path.basename(__file__)
dag_identifier = dag_identifier[:dag_identifier.rfind(".")]
CONFIG_TEMPLATE_FILE_NAME_TEST = "demo_folder/template_test.json"
CONFIG_TEMPLATE_FILE_NAME_TRAIN = "demo_folder/template_train.json"
CONFIG_TEMPLATE_ANALYSIS = "demo_folder/template_analysis.json"
with DAG('dag_' + dag_identifier,
default_args=default_args, max_active_runs=1,
schedule_interval="30 04 * * 0", catchup=True) as dag:
# Prepare the config files that defines the data to be downloaded
# these files have already the info about the cell
# they have to be recreated for changing the date
t_prepare_config_file = \
get_configuration_task(dag_identifier=dag_identifier,
template_train=CONFIG_TEMPLATE_FILE_NAME_TRAIN,
template_test=CONFIG_TEMPLATE_FILE_NAME_TEST)
# TRAIN-SET PREPARATION
# Aggregate in SPARK and Download the data in HDFS
# Creation of windows happens here
t_trainset_download = get_download_task(dag_identifier=dag_identifier,
is_train=True)
# Move the data from HDFS parquet to a single parquet in the local
# folder
t_move_locally_trainset = get_mover_task(dag_identifier=dag_identifier,
is_train=True)
# TEST-SET PREPARATION
# Aggregate in SPARK and Download the data in HDFS
# Creation of windows happens here
t_testset_download = get_download_task(dag_identifier=dag_identifier,
is_test=True)
# Move the data from HDFS parquet to a single parquet in the local
# folder
t_move_locally_testset = get_mover_task(dag_identifier=dag_identifier,
is_test=True)
# TRAIN BRANCH
t_prepare_config_file >> t_trainset_download
t_trainset_download >> t_move_locally_trainset
# TEST BRANCH
t_prepare_config_file >> t_testset_download
t_testset_download >> t_move_locally_testset
# SELECT the algorithms
"""
algo_and_params: dict(str: dict(str))
dictionary that has as key the name of the module and class
concatenated and as value it has the dictionary of attributes to
instantiate that module.
e.g "core.analyser.AEDenseTF2"
e.g "pyod.models.pca.PCA"
then the last token will be separated by the "." and we do this:
from pyod.models.pca import PCA
and instantiate the object PCA()
"""
NR_TIMESERIES = 11
NR_TIMESTEPS = 48
algo_and_params = \
{"pyod.models.pca.PCA": {},
"pyod.models.knn.KNN": {},
"pyod.models.ocsvm.OCSVM": {"nu": 0.1},
"pyod.models.iforest.IForest": {},
"pyod.models.lof.LOF": {"n_neighbors": 200},
"core.analyser_deep.AEDenseTF2": {"epochs": 20, "verbose": 0},
"core.analyser_deep.AECnnTF2": {"nr_timeseries": NR_TIMESERIES,
"nr_timesteps": NR_TIMESTEPS,
"epochs": 20, "verbose": 0},
"core.analyser_deep.AELstmTF2": {"nr_timeseries": NR_TIMESERIES,
"nr_timesteps": NR_TIMESTEPS,
"epochs": 20, "verbose": 0},
"core.analyser_forecasting.ForecastCNN": {
"nr_timeseries": NR_TIMESERIES,
"nr_timesteps": NR_TIMESTEPS,
"chunk_len": 6,
"epochs": 20,
"verbose": 0},
}
# PREPARE CONFIG for ALL ALGORITHMS
for module_and_class in algo_and_params.keys():
parameters = algo_and_params[module_and_class]
module_name = str(module_and_class[:module_and_class.rfind(".")])
class_name = str(module_and_class[module_and_class.rfind(".") + 1:])
# CREATE ANALYSIS
t_analysis_single_algo = \
get_single_algo(dag_identifier=dag_identifier,
class_name=class_name,
module_name=module_name,
parameters=parameters,
template_analysis=CONFIG_TEMPLATE_ANALYSIS)
# connect the two
t_move_locally_trainset.set_downstream(t_analysis_single_algo)
t_move_locally_testset.set_downstream(t_analysis_single_algo)
{
"history_steps": 48,
"slide_steps": 48,
"publish_per_windows" : 4,
"max_samples_for_train" : 1000,
"folder_training_time": "/opt/data_repo_volume/vm-datalake/time/demo/",
"local_scores_folder": "/opt/data_repo_volume/vm-datalake/scores/demo/",
"random_seed": 42
}
{
"hdfs_out_folder": "/project/it_cloud_data_analytics/demo_raw_parquet_test/",
"hdfs_cache_folder": "/project/it_cloud_data_analytics/demo_compressed_test/",
"local_cache_folder": "/opt/data_repo_volume/vm-datalake/demo_train/",
"code_project_name": "batch_014_{{ start_date }}_{{ end_date }}_{{ start_date_normalization }}_{{ end_date_normalization }}",
"date_start": "{{ start_date }}",
"date_end_excluded": "{{ end_date }}",
"normalization_out_folder": "/project/it_cloud_data_analytics/normalization",
"date_start_normalization": "{{ start_date_normalization }}",
"date_end_normalization_excluded": "{{ end_date_normalization }}",
"overwrite_on_hdfs": true,
"overwrite_normalization": true,
"aggregate_every_n_minutes": 10,
"history_steps": 48,
"slide_steps": 48,
"future_steps": 0,
"hostgroups": [
"cloud_compute/level2/batch/gva_project_014"
],
"selected_plugins": {
"swap_swapfile_swap_used": {
"type": "swap",
"type_instance": "used",
"plugin_name": "swap"
},
"swap_swapfile_swap_free": {
"type": "swap",
"type_instance": "free",
"plugin_name": "swap"
},
"vmem__vmpage_io_memory_in": {
"plugin_instance": "",
"type": "vmpage_io",
"type_instance": "memory",
"value_instance": "in",
"plugin_name": "vmem"
},
"vmem__vmpage_io_memory_out": {
"plugin_instance": "",
"type": "vmpage_io",
"type_instance": "memory",
"value_instance": "out",
"plugin_name": "vmem"
},
"cpu__percent_user": {
"plugin_instance": "",
"type": "percent",
"type_instance": "user",
"plugin_name": "cpu"
},
"cpu__percent_idle": {
"plugin_instance": "",
"type": "percent",
"type_instance": "idle",
"plugin_name": "cpu"
},
"cpu__percent_system": {
"plugin_instance": "",
"type": "percent",
"type_instance": "system",
"plugin_name": "cpu"
},
"load_longterm": {
"value_instance": "longterm",
"plugin_name": "load"
},
"memory__memory_free": {
"plugin_instance": "",
"type": "memory",
"type_instance": "free",
"plugin_name": "memory"
},
"interface__if_octets__tx": {
"type": "if_octets",
"type_instance": "",
"value_instance": "tx",
"plugin_name": "interface"
},
"interface__if_octets__rx": {
"type": "if_octets",
"type_instance": "",
"value_instance": "rx",
"plugin_name": "interface"
}
}
}
{
"hdfs_out_folder": "/project/it_cloud_data_analytics/demo_raw_parquet_train/",
"hdfs_cache_folder": "/project/it_cloud_data_analytics/demo_compressed_train/",
"local_cache_folder": "/opt/data_repo_volume/vm-datalake/demo_test/",
"code_project_name": "batch_014_{{ start_date }}_{{ end_date }}_{{ start_date_normalization }}_{{ end_date_normalization }}",
"date_start": "{{ start_date }}",
"date_end_excluded": "{{ end_date }}",
"normalization_out_folder": "/project/it_cloud_data_analytics/normalization",
"date_start_normalization": "{{ start_date_normalization }}",
"date_end_normalization_excluded": "{{ end_date_normalization }}",
"overwrite_on_hdfs": true,
"overwrite_normalization": true,
"aggregate_every_n_minutes": 10,
"history_steps": 48,
"slide_steps": 48,
"future_steps": 0,
"hostgroups": [
"cloud_compute/level2/batch/gva_project_014"
],
"selected_plugins": {
"swap_swapfile_swap_used": {
"type": "swap",
"type_instance": "used",
"plugin_name": "swap"
},
"swap_swapfile_swap_free": {
"type": "swap",
"type_instance": "free",
"plugin_name": "swap"
},
"vmem__vmpage_io_memory_in": {
"plugin_instance": "",
"type": "vmpage_io",
"type_instance": "memory",
"value_instance": "in",
"plugin_name": "vmem"
},
"vmem__vmpage_io_memory_out": {
"plugin_instance": "",
"type": "vmpage_io",
"type_instance": "memory",
"value_instance": "out",
"plugin_name": "vmem"
},
"cpu__percent_user": {
"plugin_instance": "",
"type": "percent",
"type_instance": "user",
"plugin_name": "cpu"
},
"cpu__percent_idle": {
"plugin_instance": "",
"type": "percent",
"type_instance": "idle",
"plugin_name": "cpu"
},
"cpu__percent_system": {
"plugin_instance": "",
"type": "percent",
"type_instance": "system",
"plugin_name": "cpu"
},
"load_longterm": {
"value_instance": "longterm",
"plugin_name": "load"
},
"memory__memory_free": {
"plugin_instance": "",
"type": "memory",
"type_instance": "free",
"plugin_name": "memory"
},
"interface__if_octets__tx": {
"type": "if_octets",
"type_instance": "",
"value_instance": "tx",
"plugin_name": "interface"
},
"interface__if_octets__rx": {
"type": "if_octets",
"type_instance": "",
"value_instance": "rx",
"plugin_name": "interface"
}
}
}
## Getting Started
You can use this module in two environments:
1. in the SWAN notebook
1. in your local machine
1. in a [CERN VM for Automatic Anomaly Detection](documentation/ad-pipeline-airflow.md)
### Prerequisites
You need to:
1. be able to authenticate via kerberos for the Spark access
```
kinit username@CERN.CH
```
2. have a Grafana token, to access the InfluxDB and ElasticSearch data.
### Installing - SWAN (https://swan.web.cern.ch/)
1. Make sure you start Swan connecting to the right Spark cluster, that in general is `the General Purpose (Analytix)`
2. Paste this in you notebook:
``` python
# necessary to download the authenticate
import getpass
import os, sys
print("Please enter your kerberos password")
ret = os.system("echo \"%s\" | kinit" % getpass.getpass())
if ret == 0: print("Credentials created successfully")
else: sys.stderr.write('Error creating credentials, return code: %s\n' % ret)
```
2. Execute the cell and insert your passwork. Now you are logged in.
3. Paste this and execute:
```
! pip install --user git+https://:@gitlab.cern.ch:8443/cloud-infrastructure/data-analytics.git
```
4. Instal the package that you want (e.g):
``` python
from etl.spark_etl import cluster_utils
```
### Installing - Local machine
1. You can directly login to your shell with the command:
```
kinit username@CERN.CH
```
2. Insert you pasword.
3. Directly install the package from gitlab
```
! pip install --user git+https://:@gitlab.cern.ch:8443/cloud-infrastructure/data-analytics.git
```
## Possible use cases
![use-case-diagram](documentation/diagrams/use-case-data-analytics.png)
## Example Notebooks
You can refer to the [notebook folder](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/tree/master/notebooks) to explore some example of usage.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment