Commit d47e1c96 authored by Domenico Giordano's avatar Domenico Giordano
Browse files

inter2

parent d2901ff0
......@@ -13,65 +13,13 @@ We have four main types of DAGs representing the three steps in the benchmark pr
## Our Operators know nothing!
We exploit the BashOperator to run a docker container, generally based on the `sparknotebook` container available in the gitlab registry of this same project.
All the intelligence implemented in the `adcern lib` runs in the container. This choice allow to run the same processes independently from Airflow. Pipelines can be therefore implemented and tested outside Airflow, and in particular can be also tested in Jupyter notebooks, if that notebook starts from the image `sparknotebook`. An example of this approach is provided in the CI [test](../../../tests/adcern/integration) of adcern
The reason why we don't use the DockerOperator of Airflow, and we prefer to pass the `docker run` command to the
recommend the use of our **custom operator**, since it allows the execution in docker of a custom python function. We felt the need of creating this new operator because (at the moment of writing 30/10/2020) the [official docker operator](https://airflow.apache.org/docs/stable/_api/airflow/operators/docker_operator/index.html) provided by Airflow has some shortcomings:
1. the **--log-driver** Docker attribute is not supported.
1. only **fixed commands*** (or python function) can be passed to the official operator.
1. the **environment** parameter expose all the environmental variable passed to the Airflow UI where they are visible. Therefore the official operator cannot use those fields to pass sentitive information to the container (e.g. login credentials).
To takle these points our custom operator supports:
1. Native **fluentd logging**. Every Docker conainer will log its output to the [Fluentd daemon](https://docs.docker.com/config/containers/logging/fluentd/) so that our results can be then redirected wherever you want (e.g. Elasticsearch of the Cern MONIT Infrastructure).
1. **Customizable python calls**. We can decide which function to run inside the Docker container and with which arguements similarly to what the official Airflow [Python Operator](https://airflow.apache.org/docs/stable/howto/operator/python.html) does with **python_callable** and **op_kwargs** parameters. The difference is that instead of using the worker we run the pyton function in a self-contained container environment
1. **Automatic authentication**. We use environmental variable to pass the secrets and automatically authenticate with kinit.
> Bonus: **HDFS and Spark support**. Thanks to the new authentication mechanism we now support the usage of the **Cern Spark cluster analytixs** directly from within the Docker container itself.
## Customizable python calls explained
To use custom python calls we have to create a separate script (e.g. *example_functions.py*) that contains our custom functions. This script is then transformed in a command line interface tool thanks to the [**Click** python library](https://pypi.org/project/click/).
Suppose to write a new function in the *example_functions.py* file:
```python
import library-available-only-in-the-container
def my_function(param_1, param_2):
"""Whatever-you-want function."""
# computation
# ...
# for example
result = str(param_1) + str(param_2)
# ...
print(result)
```
Thanks to few modification to this file, the Click library transforms this *example_functions.py* file in a command line interface and we can call the function in this way from the same folder:
```bash
[matteo@my-machine] python example_functions.py my_function --param_1 Hello --param_2 World
```
The next step is done by our custom operator that takes care of executing this function in the docker container. All you need to do is to instantiate our operator in your dag in this way.
```python
with DAG('little_powerful_dag',
default_args=default_args) as dag:
my_dict_options = {
"param_1": "Hello",
"param_2": "World",
}
my_task = \
DockerToPythonOperator(task_id="your_python_call_in_docker",
python_script="example_functions.py",
function_name="my_function",
dict_options=my_dict_options)
```
We exploit the **BashOperator** to run a docker container, generally based on the **sparknotebook** container available in the gitlab registry of this same project.
All the intelligence implemented in the **adcern lib** runs in the *sparknotebook* container. This choice allow to run the same processes independently from Airflow. Pipelines can be therefore implemented and tested outside Airflow, and in particular can be also tested in Jupyter notebooks, if that notebook starts from the image *sparknotebook*. An example of this approach is provided in the CI [test](../../../tests/adcern/integration) of adcern.
The reason why we don't use the [official DockerOperator](https://airflow.apache.org/docs/stable/_api/airflow/operators/docker_operator/index.html) of Airflow, and we prefer to pass the *docker run* command to the BashOperator via a [script](./scripts/anomaly_detection_tasks.sh) is due to a limitation of the DockerOperator:
1. the **--log-driver** Docker attribute is not supported. We use the log-driver to configure the **fluentd logging** service that collects the anomaly detection scores. Implementing directly the docker run command we can configure . Every Docker container will log its output to the [Fluentd daemon](https://docs.docker.com/config/containers/logging/fluentd/) so that results can be then redirected wherever you want (e.g. Elasticsearch of the Cern MONIT Infrastructure).
In order to guarantee the authentication to the **Cern Spark cluster** and to **EOS** the Kerberos ticket cache is percolated from the Airflow docker-compose to the running *sparknotebook* container.
## How to build a simple task for our DAG
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment