Commit 32da83e8 authored by Domenico Giordano's avatar Domenico Giordano
Browse files

readme to be improved

parent 09f131bd
# Available Airflow DAGs (or Pipelines)
**work in progress**
## DAG description
We have four main types of DAGs representing the three steps in the benchmark procedure plus a combination suitable for continuous monitoring:
1. batch_1_etl / shared_1_etl : it downloads the data periodically from HDFS, aggregate them and save them in EOS. You control what to download via the two configuration yaml files: CONFIG_TEMPLATE_FILE_NAME_TEST and CONFIG_TEMPLATE_FILE_NAME_TRAIN. Note that this is done in Spark and the number of concurrent DAG you run influences the memory of the local VM, so be careful not to run too many of them concurrently (max_active_runs parameter of the dag).
1. batch_2_experiment / shared_2_experiment : it reads the EOS data and run the predefined algorithms in the analysis.yaml file (refer to CONFIG_TEMPLATE_ANALYSIS variable). This step can be on the current VM or on the K8s cluster by using the Airflow operators, DockerToPythonOperator and PythonInK8sOperator respectively. Note that this step publish data both in the MONIT infrastructure (via fluentd) and in the score folder (typically in EOS) as specified in the analysis.yaml file (refer to CONFIG_TEMPLATE_ANALYSIS variable).
1. batch_3_evaluation / shared_3_evaluation : it read the locally produced scores (in EOS), downloads the labeled data from Grafana and compute the performance of every algorithm in the benchmark period. It produces the results in the form of diagrams in the relative result folder defined in the analysis.yaml file (refer to CONFIG_TEMPLATE_ANALYSIS variable).
1. batch_4_always_on / shared_4_always_on: One additional kind of pipeline combines the previous two steps (ETL + production of scores in the MONIT) and run continuously.
## Why a new operator
We recommend the use of our **custom operator**, since it allows the execution in docker of a custom python function. We felt the need of creating this new operator because (at the moment of writing 30/10/2020) the [official docker operator]( provided by Airflow has some shortcomings:
1. the **--log-driver** Docker attribute is not supported.
1. only **fixed commands*** (or python function) can be passed to the official operator.
1. the **environment** parameter expose all the environmental variable passed to the Airflow UI where they are visible. Therefore the official operator cannot use those fields to pass sentitive information to the container (e.g. login credentials).
To takle these points our custom operator supports:
1. Native **fluentd logging**. Every Docker conainer will log its output to the [Fluentd daemon]( so that our results can be then redirected wherever you want (e.g. Elasticsearch of the Cern MONIT Infrastructure).
1. **Customizable python calls**. We can decide which function to run inside the Docker container and with which arguements similarly to what the official Airflow [Python Operator]( does with **python_callable** and **op_kwargs** parameters. The difference is that instead of using the worker we run the pyton function in a self-contained container environment
1. **Automatic authentication**. We use environmental variable to pass the secrets and automatically authenticate with kinit.
> Bonus: **HDFS and Spark support**. Thanks to the new authentication mechanism we now support the usage of the **Cern Spark cluster analytixs** directly from within the Docker container itself.
## Customizable python calls explained
To use custom python calls we have to create a separate script (e.g. **) that contains our custom functions. This script is then transformed in a command line interface tool thanks to the [**Click** python library](
Suppose to write a new function in the ** file:
import library-available-only-in-the-container
def my_function(param_1, param_2):
"""Whatever-you-want function."""
# computation
# ...
# for example
result = str(param_1) + str(param_2)
# ...
Thanks to few modification to this file, the Click library transforms this ** file in a command line interface and we can call the function in this way from the same folder:
[matteo@my-machine] python my_function --param_1 Hello --param_2 World
The next step is done by our custom operator that takes care of executing this function in the docker container. All you need to do is to instantiate our operator in your dag in this way.
with DAG('little_powerful_dag',
default_args=default_args) as dag:
my_dict_options = {
"param_1": "Hello",
"param_2": "World",
my_task = \
## How to build a simple task for our DAG
Whenever you want to add a new task to your DAG that is executing a python function in a container you need to do the following steps:
1. open a python script in the **LOCAL_SCRIPTS_AREA**. We recommend to create a single script in this area to contain most of the functionalities (e.g. a python script to group all ETL related functions, one for all analysis related functions and one for all plotting related functions). Make sure it is a cli command
1. create the new fucntion and decorate it such that the Click library can do the rest to handle its arguments
1. create a new task in your DAG
## Advanced topics
1. How to use Spark
1. How to pass a dictionary as arguments
## Limitations
You cannot pass string arguments with spaces, otherwise the click library will interpret the two as separate command options.
# Run on Kubernetes
To run you algorithm train on the Kubernetes (K8s) cluster you need to go through the following steps:
## Set up your cluster
1. Create a new K8s cluster. Go to, go to the deveoper project (because we have more computational power) and create a cluster named: my-cluster-name
1. Download from the top right the .sh script to connect to that project from whatever machine (tool > Download Openstack RC V3 bla bla).
1. Place this script in your VM running the docker-compose Anomaly Detection system
1. Source it and insert your password (kinit of the account that created the cluster)
source <just_downloaded_script>.sh
1. Create the config file for the kubectl command:
openstack coe cluster config my-cluster-name
This will produce a **config** file. Note that this file is super important since it gives to whoever use it the power to command the cluster.
1. Export this path to your environment variable every time you want to manually inspect your cluster (see next section).
export KUBECONFIG=<absolute_path_where_you_created_the_config_file>/config
## Optional - Manual inspection of your cluster
If you want to monitor your cluster via command line to see what is going on you will need the following libraries on your VM:
pip install python-openstackclient
pip install python-magnumclient
Then you will be able to call commands such as:
openstack coe cluster list
kubectl get all
## Give cluster commands to Airflow
Give instruction to Airflow how to reach the cluster via the configuration file (under the hood it uses the [kubectl]( command). In this way the container that run Airflow (scheduler, etc.) can access the cluster.
1. Go to the **** file of airflow containing all pass and insert this environment variable
export KUBECONFIG=<absolute_path_where_you_created_the_config_file>/config
## Use Pod operators
In your DAG you will then be able to run all the Pod Operators you want. (Remeber to restart your docker compose so that Airflow can pick up the new secrets).
Example of pod operator in your DAG:
t_bash_ls = KubernetesPodOperator(namespace='default',
cmds=["sh", "-c"],
arguments=["ls /sys"],
#arguments=["print('hello world')"],
labels={"foo": "bar"},
Example of pod operator in your DAG using EOS.
Note that you need to run a Helm chart developed by Spyros and Ricardo here (
from kubernetes.client import models as k8s
# This is needed to have a reference to pass to the PodOperator
# so that it knows which volume to connect.
volume_eos = k8s.V1Volume(
eos_volume_mount = k8s.V1VolumeMount(mount_path='/eos',
secrity_eos = k8s.V1PodSecurityContext(
t_eos_ls = KubernetesPodOperator(namespace='default',
cmds=["sh", "-c"],
arguments=["ls /eos"],
## Run EOS in K8s
To use eos in K8s you need to have the be authenticated in the kinit.
The proper way to do that in K8s is using the secrets.
Follow these steps:
1. Install the following Helm chart on the K8s so that every container in the cluster can access the EOS folders it is allowed to see:
1. Create a secret definition a file named **user_secret.yaml** with this content:
apiVersion: v1
kind: Secret
name: my-kinit-secret
username: <your-username-kinit>
password: <your-password>
1. Save the secret in the cluster k8s, by running:
kubectl apply -f user_secret.yaml
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment