Commit b5f2882f authored by Domenico Giordano's avatar Domenico Giordano
Browse files

Merge branch 'qa-v0.4' into qa-v0.4_stiven

parents 0ee18e5a 159daa69
# Data Analytics
| qa | master |
| [qa-v0.4](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/commits/qa-v0.4) | [v0.3](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/commits/v0.3) |
| :-: | :-: |
|[![pipeline status qa](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/qa/pipeline.svg)](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/commits/qa) |[![pipeline status master](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/master/pipeline.svg)](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/commits/master)|
|[![coverage report qa](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/qa/coverage.svg)](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/commits/qa)|[![coverage report master](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/master/coverage.svg)](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/-/commits/master)|
|![](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/qa-v0.4/pipeline.svg) |![](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/v0.3/pipeline.svg)|
|![](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/qa-v0.4/coverage.svg) |![](https://gitlab.cern.ch/cloud-infrastructure/data-analytics/badges/v0.3/coverage.svg)|
The project contains a suite of tools to run data analytics pipelines on the monitoring data of the CERN Cloud Infrastructure.
......@@ -16,35 +16,37 @@ Some of the functionalities supported are:
A central part of this project is the Anomaly Detection on time series data.
This time series data can come from:
The time series data can come from:
- metrics measured for each hypervisor in the Data Centre.
- derived timeseries from log file analysis.
The CI/CD of this project is used to
The CI/CD of this project is used to:
1. Run unit tests and quality checks for the implemented code
1. Build Docker images with pre-installed libraries needed for the project's scope
1. Run functional tests of the Data Analytics' pipeline, and its components
The repository contains extensive documentation of each subfolder in the README file included in the specific subfolder.
The repository contains extensive documentation of each subfolder in the README file included in each subfolder.<br>
This is a guide map of the repository:
1. ETL libraries ([link](etl))<br>
1. ETL libraries ([etl](etl))<br>
Implement the extraction of data from the different monitoring databases: InfluxDB, ElasticSearch, HDFS
1. Tests suite ([link](tests))<br>
Unit tests of the ETL libraries, test pipelines' components
1. Javascript Grafana extension ([link](grafana_extension))<br>
Implement an extension of the Grafana Annotation panel, modifying the Grafana JS code
1. Anomaly detection libraries ([link](adcern))<br>
1. Anomaly detection libraries ([adcern](adcern))<br>
Implement anomaly detection Models, based on pyOD, traditional ML and DL methods
1. Docker image definition ([link](docker-images))<br>
1. Tests suite ([tests](tests))<br>
Unit tests of the ETL libraries, test pipelines' components
1. Docker image definition ([docker-images](docker-images))<br>
Dockerfiles for images used in this project
1. Airflow-based Anomaly Detection System ([link](control_room))<br>
1. Airflow-based Anomaly Detection System ([control_room](control_room))<br>
Setup and run the Anomaly Detection System
1. Javascript Grafana extension ([grafana_extension](grafana_extension))<br>
Implement an extension of the Grafana Annotation panel, modifying the Grafana JS code
All these components are needed to deploy the Anomaly Detection System described in the figure
<br><img src="documentation/images/AD_system_technologies.png" width="70%"><br>
## From where to start
1. For a general introduction on this activity see the [ITTF seminar](https://indico.cern.ch/event/1012703/)
1. For interactive examples see [examples](examples)
1. For Airflow deployment see [control_room](control_room))
1. For Airflow deployment see [control_room](control_room)
......@@ -2,7 +2,7 @@
The Anomaly Detection task can also be run in an automatic way.
For doing that we rely on [Apache Airflow](https://airflow.apache.org/).
To have an easy to use environment we encapsulated all the required blocks (Airflow included) in Docker Containers that can be run thanks to Docker compose.
To have an easy to use environment we encapsulated all the required blocks (Airflow included) in Docker Containers that can be run thanks to *Docker compose*.
The Airflow Docker compose is heavily based on examples found in https://github.com/puckel/docker-airflow
......@@ -10,17 +10,21 @@ This area is called `Control room` and contains the procedures to deploy the Ai
The folder includes
1. Installation scripts ([link](install_AD.sh))<br>
1. Installation scripts ([install_AD.sh](install_AD.sh))<br>
To be run once when a new machine needs to be configured
1. Docker-compose configuration ([link](airflow-compose))<br>
1. Docker-compose configuration ([airflow-compose](airflow-compose))<br>
To setup the Airflow system
1. Docker-swarm configuration - w.i.p. ([link](docker-swarm))<br>
1. Docker-swarm configuration - w.i.p. ([docker-swarm](docker-swarm))<br>
## Getting started
We suggest to run on a dedicated virtual machine VM that can be provisioned on the [OpenStack CERN Platform](https://openstack.cern.ch/). For initial test we suggest to start with a flavor providing at least 7 GB of RAM.
The set of components that will be deployed with the following procedure is described in this image
<br><img src="documentation/images/AD_components_deployed.png" width="70%"><br>
We suggest to run on a dedicated virtual machine VM that can be provisioned on the [OpenStack CERN Platform](https://openstack.cern.ch/).
<br> For initial test we suggest to start with a flavor providing at least 7 GB of RAM.
1. Login to the VM (**tested on CentOS 7**) with the following port forwarding:
......@@ -73,12 +77,12 @@ Now that Airflow is up and running we can test the Anomaly Detection System and
its algorithms on a demo scenario.
1. Open the Airflow UI: http://localhost:8080/
1. Search for the dag named **dag_ad_demo** and click on its name.
1. Click on the *graph view* tab to see the interconnection between different tasks
1. Click on the **on/off switch** next to the header *DAG: dag_ad_demo*.
1. Select the *DAGs* tab from the Airflow menu.
1. Go to the *Graph View* tab to see the interconnection between different tasks
1. Click on the **on/off switch** next to the header *DAG: <dag_name>* to enable it.
**Congratulation!** You just started your first Anomaly Detection pipeline. Check then its successful termination via the *graph view*,
when all the boxes are dark green the pipeline is completed.
**Congratulation!** You just started your first Anomaly Detection pipeline. Check then its successful termination via the *Tree view*,
when the last box is dark green the pipeline is completed successfully.
## Additional Documentation
......
# Anomaly Detection System driven by Airflow
**NB**:
In these examples there are dummy passwords stored in the secret.sh file.
In these examples there are dummy passwords stored in the **secret.sh** file.
Those passwords are here only as example for a simple local test.
**!!! Do not commit real production passwords !!!**
## Docker Operator
The Airflow Operators usable in our use case are BashOperator, DockerOperator, Docker Swarm Operator or kubernetes Operator.
Documentation about docker operator can be found at
- https://airflow.apache.org/docs/stable/_modules/airflow/operators/docker_operator.html#DockerOperator
- https://marclamberti.com/blog/how-to-use-dockeroperator-apache-airflow/
Docker swarm operator
- https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/docker_swarm_operator.html#DockerSwarmOperator._run_image
To start/stop Airflow docker-compose use the following commands:
1. Initialize: `source ./start_ad_system.sh`
1. Reset (!!!CAVEAT: this removes all stored DAG executions and logs!!!): `start_compose_AD prune`
1. Stop AD Airflow: `stop_compose_AD`
1. Stop AD Airflow and remove volumes (same effect as in 2.): `stop_compose_AD --volumes`
1. Start/Re-start AD Airflow (w/o removing volumes): `start_compose_AD`
## Troubleshooting
......
......@@ -22,7 +22,15 @@ the **--log-driver** Docker attribute is not supported. We use the log-driver to
In order to guarantee the authentication to the **Cern Spark cluster** and to **EOS** the Kerberos ticket cache is percolated from the Airflow docker-compose to the running *sparknotebook* container.
# Run on Kubernetes
Note:
- Documentation about docker operator can be found at
- https://airflow.apache.org/docs/stable/_modules/airflow/operators/docker_operator.html#DockerOperator
- https://marclamberti.com/blog/how-to-use-dockeroperator-apache-airflow/
- Docker swarm operator
- https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/docker_swarm_operator.html#DockerSwarmOperator._run_image
# Run on Kubernetes (THIS PART REFERS TO OLD APPROACH. TO BE REVIEWED)
Steps to run algorithm train on the Kubernetes (K8s) cluster are described here
1. Set up k8s cluster, following documentation reported [here](../k8s/README.md)
......
......@@ -100,7 +100,9 @@ def ad_etl_dag(dag_id='give_me_a_name', override_params={}):
trigger_rule=trigger_rule,
dag=dag
)
# List of tasks to define based on a BashOperator
# Each entry includes the tuple: (task_id, step of the AD pipeline to call from inside the BashOperator script, trigger rule)
ad_tasks = [
('check_local_data_presence', 'data_mining data_presence --resource_file ad_config_file.json', 'all_success'),
('spark_normalization_presence', 'data_mining normalization_presence --resource_file ad_config_file.json', 'all_failed'),
......@@ -109,9 +111,12 @@ def ad_etl_dag(dag_id='give_me_a_name', override_params={}):
('spark_mv_data_to_local', 'data_mining copy_locally --resource_file ad_config_file.json', 'all_success'),
]
# Instantiate the AD tasks
for atask in ad_tasks:
globals()[atask[0]] = return_configured_BashOperator(*atask)
# Define the task dependency flow
# Start checking the local data presence and in case break pipeline
check_local_data_presence >> dag_exit_status
# Otherwise if datapresence fails, check the normalization presence
......
......@@ -52,7 +52,6 @@ echo -e "\nVerification: running_image is $running_image\n"
cat > docker_run_script.sh <<'EOF'
#!/bin/bash
env
whoami;
pwd;
id
export KRB5CCNAME
......@@ -61,11 +60,6 @@ echo ${config_base_64} | base64 -d > ad_config_file.json;
cat ad_config_file.json;
klist -c {{var.value.KRB5CCNAME}};
local_cache_folder=`python -c "import json; f=open(\"ad_config_file.json\"); print(json.loads(f.read())[\"local_cache_folder\"])"`
echo "local_cache_folder is $local_cache_folder"
#mkdir -p $local_cache_folder;
#[[ "$?" -ne 0 ]] && exit 1;
source set_spark_analytix.sh;
{{params.ad_task}};
......
......@@ -107,96 +107,23 @@ date_end_normalization_excluded: "__templated__"
# List of plugins to mine.
# Note that it is a dictionary where every key represents the name your plugin
# have and the value is a dictionary with:
# 'plugin_instance', 'type' 'type_instance', 'plugin_name'
# the value asigned to these key is defining an and-filter.
# you will get only the data that have all those attributes
# ('plugin_instance', 'type' 'type_instance', 'plugin_name') in and with the
# specified value
# Note that if you do not want to filter on one attribute do not express it.
# 'plugin_data_path': the hdfs location where the plugin is
# 'plugin_filter': the plugin filter to select the interesting data
selected_plugins:
# EXTRA FOR THE SHARED
# cloud_contextswitch_involuntary:
# type: contextswitch
# type_instance: involuntary
# plugin_name: cloud
# 5 METRICS - FUNDAMENTAL
load_longterm:
value_instance: longterm
plugin_name: load
plugin_data_path: "/project/monitoring/collectd/load"
plugin_filter: "value_instance == 'longterm' and plugin == 'load'"
cpu__percent_idle:
plugin_instance: ''
type: percent
type_instance: idle
plugin_name: cpu
plugin_data_path: "/project/monitoring/collectd/cpu"
plugin_filter: "type == 'percent' and type_instance == 'idle' and plugin == 'cpu'"
memory__memory_free:
plugin_instance: ''
type: memory
type_instance: free
plugin_name: memory
plugin_data_path: "/project/monitoring/collectd/memory"
plugin_filter: "type == 'memory' and type_instance == 'free' and plugin == 'memory'"
vmem__vmpage_io_memory_in:
plugin_instance: ''
type: vmpage_io
type_instance: memory
value_instance: in
plugin_name: vmem
plugin_data_path: "/project/monitoring/collectd/vmem"
plugin_filter: "type == 'vmpage_io' and type_instance == 'memory' and value_instance == 'in' and plugin == 'vmem'"
swap_swapfile_swap_free:
type: swap
type_instance: free
plugin_name: swap
# +5 TO SHOULD IMPROVE
# cpu__percent_wait:
# plugin_instance: ''
# type: percent
# type_instance: wait
# plugin_name: cpu
# cpu__percent_system:
# plugin_instance: ''
# type: percent
# type_instance: system
# plugin_name: cpu
# vmem__vmpage_io_memory_out:
# plugin_instance: ''
# type: vmpage_io
# type_instance: memory
# value_instance: out
# plugin_name: vmem
# interface__if_octets__tx:
# type: if_octets
# type_instance: ''
# value_instance: tx
# plugin_name: interface
# interface__if_octets__rx:
# type: if_octets
# type_instance: ''
# value_instance: rx
# plugin_name: interface
# # +5 TO CHALLENGE ALGOS IN HIGH DIMENSIONALITY
# df_var_percent_bytes_free:
# plugin_instance: var
# type: percent_bytes
# type_instance: free
# plugin_name: df
# uptime__uptime_:
# plugin_instance: ''
# type: uptime
# type_instance: ''
# plugin_name: uptime
# processes__fork_rate_:
# plugin_instance: ''
# type: fork_rate
# type_instance: ''
# plugin_name: processes
# processes__ps_state_sleeping:
# plugin_instance: ''
# type: ps_state
# type_instance: sleeping
# plugin_name: processes
# processes__ps_state_blocked:
# plugin_instance: ''
# type: ps_state
# type_instance: blocked
# plugin_name: processes
...
\ No newline at end of file
plugin_data_path: "/project/monitoring/collectd/swap"
plugin_filter: "type == 'swap' and type_instance == 'free' and plugin == 'swap'"
......@@ -51,7 +51,7 @@ from etl.spark_etl import cluster_utils
## Possible use cases
![use-case-diagram](documentation/diagrams/use-case-data-analytics.png)
![use-case-diagram](documentation/images/use-case-data-analytics.png)
## Example Notebooks
......
......@@ -9,7 +9,7 @@ The repository implements ETL (Extract, Transform, Load) modules for:
Here the UML class diagram of the module.
![use-case-diagram](documentation/diagrams/grafana_etl_package.png)
![use-case-diagram](documentation/images/grafana_etl_package.png)
## Spark ETL
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment