Commit 09f131bd authored by Domenico Giordano's avatar Domenico Giordano
Browse files

move everything in README

parent 53b38989
# How to create your Airflow DAG (or Pipeline)
Date: 30 November 2020 - Author: Matteo Paltenghi
Here we explain which is the recommended way of creating DAGs. In the **dags** folder you can already find some example dags.
## Why a new operator
We recommend the use of our **custom operator**, since it allows the execution in docker of a custom python function. We felt the need of creating this new operator because (at the moment of writing 30/10/2020) the [official docker operator](https://airflow.apache.org/docs/stable/_api/airflow/operators/docker_operator/index.html) provided by Airflow has some shortcomings:
1. the **--log-driver** Docker attribute is not supported.
1. only **fixed commands*** (or python function) can be passed to the official operator.
1. the **environment** parameter expose all the environmental variable passed to the Airflow UI where they are visible. Therefore the official operator cannot use those fields to pass sentitive information to the container (e.g. login credentials).
To takle these points our custom operator supports:
1. Native **fluentd logging**. Every Docker conainer will log its output to the [Fluentd daemon](https://docs.docker.com/config/containers/logging/fluentd/) so that our results can be then redirected wherever you want (e.g. Elasticsearch of the Cern MONIT Infrastructure).
1. **Customizable python calls**. We can decide which function to run inside the Docker container and with which arguements similarly to what the official Airflow [Python Operator](https://airflow.apache.org/docs/stable/howto/operator/python.html) does with **python_callable** and **op_kwargs** parameters. The difference is that instead of using the worker we run the pyton function in a self-contained container environment
1. **Automatic authentication**. We use environmental variable to pass the secrets and automatically authenticate with kinit.
> Bonus: **HDFS and Spark support**. Thanks to the new authentication mechanism we now support the usage of the **Cern Spark cluster analytixs** directly from within the Docker container itself.
## Customizable python calls explained
To use custom python calls we have to create a separate script (e.g. *example_functions.py*) that contains our custom functions. This script is then transformed in a command line interface tool thanks to the [**Click** python library](https://pypi.org/project/click/).
Suppose to write a new function in the *example_functions.py* file:
```python
import library-available-only-in-the-container
def my_function(param_1, param_2):
"""Whatever-you-want function."""
# computation
# ...
# for example
result = str(param_1) + str(param_2)
# ...
print(result)
```
Thanks to few modification to this file, the Click library transforms this *example_functions.py* file in a command line interface and we can call the function in this way from the same folder:
```bash
[matteo@my-machine] python example_functions.py my_function --param_1 Hello --param_2 World
```
The next step is done by our custom operator that takes care of executing this function in the docker container. All you need to do is to instantiate our operator in your dag in this way.
```python
with DAG('little_powerful_dag',
default_args=default_args) as dag:
my_dict_options = {
"param_1": "Hello",
"param_2": "World",
}
my_task = \
DockerToPythonOperator(task_id="your_python_call_in_docker",
python_script="example_functions.py",
function_name="my_function",
dict_options=my_dict_options)
```
## How to build a simple task for our DAG
Whenever you want to add a new task to your DAG that is executing a python function in a container you need to do the following steps:
1. open a python script in the **LOCAL_SCRIPTS_AREA**. We recommend to create a single script in this area to contain most of the functionalities (e.g. a python script to group all ETL related functions, one for all analysis related functions and one for all plotting related functions). Make sure it is a cli command
```python
# EXAMPLE OF MINIMAL CLI FILE
```
1. create the new fucntion and decorate it such that the Click library can do the rest to handle its arguments
```python
# EXAMPLE OF FUNCTION
```
1. create a new task in your DAG
```python
# EXAMPLE OF NEW DAG TASK
```
## Advanced topics
1. How to use Spark
1. How to pass a dictionary as arguments
## Limitations
You cannot pass string arguments with spaces, otherwise the click library will interpret the two as separate command options.
# Run on Kubernetes
To run you algorithm train on the Kubernetes (K8s) cluster you need to go through the following steps:
## Set up your cluster
1. Create a new K8s cluster. Go to openstack.cern.ch, go to the deveoper project (because we have more computational power) and create a cluster named: my-cluster-name
1. Download from the top right the .sh script to connect to that project from whatever machine (tool > Download Openstack RC V3 bla bla).
1. Place this script in your VM running the docker-compose Anomaly Detection system
1. Source it and insert your password (kinit of the account that created the cluster)
```
source <just_downloaded_script>.sh
```
1. Create the config file for the kubectl command:
```
openstack coe cluster config my-cluster-name
```
This will produce a **config** file. Note that this file is super important since it gives to whoever use it the power to command the cluster.
1. Export this path to your environment variable every time you want to manually inspect your cluster (see next section).
```
export KUBECONFIG=<absolute_path_where_you_created_the_config_file>/config
```
## Optional - Manual inspection of your cluster
If you want to monitor your cluster via command line to see what is going on you will need the following libraries on your VM:
```
pip install python-openstackclient
pip install python-magnumclient
```
Then you will be able to call commands such as:
```
openstack coe cluster list
kubectl get all
```
## Give cluster commands to Airflow
Give instruction to Airflow how to reach the cluster via the configuration file (under the hood it uses the [kubectl](https://kubernetes.io/docs/reference/kubectl/overview/) command). In this way the container that run Airflow (scheduler, etc.) can access the cluster.
1. Go to the **secret.sh** file of airflow containing all pass and insert this environment variable
```
export KUBECONFIG=<absolute_path_where_you_created_the_config_file>/config
```
## Use Pod operators
In your DAG you will then be able to run all the Pod Operators you want. (Remeber to restart your docker compose so that Airflow can pick up the new secrets).
Example of pod operator in your DAG:
```python
t_bash_ls = KubernetesPodOperator(namespace='default',
image="python:3.6-stretch",
cmds=["sh", "-c"],
arguments=["ls /sys"],
#cmds=["python","-c"],
#arguments=["print('hello world')"],
labels={"foo": "bar"},
name="bash-ls-task",
task_id="bash-ls-task",
is_delete_operator_pod=True,
in_cluster=False,
dag=dag
)
```
Example of pod operator in your DAG using EOS.
Note that you need to run a Helm chart developed by Spyros and Ricardo here (https://gitlab.cern.ch/helm/charts/cern/-/tree/master/eosxd).
```python
from kubernetes.client import models as k8s
# - CREATE VOLUME
# IT MUST HAVE BEEN ALREADY CREATED BY THE HELM OF EOS
# https://gitlab.cern.ch/helm/charts/cern/-/tree/master/eosxd
# - DEFINE VOLUME
# This is needed to have a reference to pass to the PodOperator
# so that it knows which volume to connect.
volume_eos = k8s.V1Volume(
name='eos',
host_path=k8s.V1HostPathVolumeSource(path='/var/eos')
)
# - VOLUME MOUNTING
eos_volume_mount = k8s.V1VolumeMount(mount_path='/eos',
name='eos',
mount_propagation='HostToContainer')
# - SECURITY
secrity_eos = k8s.V1PodSecurityContext(
se_linux_options=k8s.V1SELinuxOptions(
type='spc_t'
)
)
t_eos_ls = KubernetesPodOperator(namespace='default',
image="python:3.6-stretch",
cmds=["sh", "-c"],
arguments=["ls /eos"],
name="eos-ls-task",
task_id="eos-ls-task",
is_delete_operator_pod=True,
in_cluster=False,
volumes=[volume_eos],
volume_mounts=[eos_volume_mount],
security_context=secrity_eos,
dag=dag
)
```
## Run EOS in K8s
To use eos in K8s you need to have the be authenticated in the kinit.
The proper way to do that in K8s is using the secrets.
Follow these steps:
1. Install the following Helm chart on the K8s so that every container in the cluster can access the EOS folders it is allowed to see:
https://gitlab.cern.ch/helm/charts/cern/-/tree/master/eosxd
1. Create a secret definition a file named **user_secret.yaml** with this content:
```
apiVersion: v1
kind: Secret
metadata:
name: my-kinit-secret
type: kubernetes.io/basic-auth
stringData:
username: <your-username-kinit>
password: <your-password>
```
1. Save the secret in the cluster k8s, by running:
```
kubectl apply -f user_secret.yaml
```
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment