Commit ee5fb778 authored by Domenico Giordano's avatar Domenico Giordano
Browse files

handling of dag exit status

parent 367adcb3
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.state import State
from builtins import range
from datetime import timedelta, datetime
......@@ -8,6 +11,27 @@ from datetime import timedelta, datetime
import json
from mergedeep import merge, Strategy
def final_status(**kwargs):
print(kwargs)
for task_instance in kwargs['dag_run'].get_task_instances():
print("task_id %30s \t current_state %20s" % (task_instance.task_id,task_instance.current_state()))
this_task = kwargs['task']
print('this task_id is %s \t upstream task_ids %s' % (this_task.task_id, this_task.upstream_task_ids))
total_upstreams = len(this_task.upstream_task_ids)
counter=0
for task_instance in kwargs['dag_run'].get_task_instances():
if task_instance.task_id in this_task.upstream_task_ids and \
task_instance.current_state() != State.SUCCESS:
print('Checking parents: parent with task_id %s has current_state %s' % (task_instance.task_id,
task_instance.current_state()))
counter+=1
if counter == total_upstreams: #It means all upstream failed
raise Exception("Task {} has collected failures. Failing this DAG run".format(task_instance.task_id))
return True
def ad_etl_dag(dag_id='give_me_a_name', override_params={}):
'''
default_params: DAG default parameters passed to DAGs
......@@ -25,6 +49,7 @@ def ad_etl_dag(dag_id='give_me_a_name', override_params={}):
'start_date': datetime(2021, 1, 1),
'end_date': datetime(2021, 4, 1),
'ad_config' : {
'running_image': 'gitlab-registry.cern.ch/cloud-infrastructure/data-analytics/sparknotebook:qa-v0.4',
'template_config_file': '/opt/airflow/dags/scripts/anomaly_detection_tasks_cfg.json',
'hostgroups': ['cloud_compute/level2/batch/gva_project_013'],
'code_project_name': 'Airflow_{{dag.dag_id}}_{{ds}}',
......@@ -54,8 +79,16 @@ def ad_etl_dag(dag_id='give_me_a_name', override_params={}):
override_params)
print('DAG Args %s' % dag_args)
# Create the DAG with the dictionary of args
dag = DAG(**dag_args)
dag_exit_status = PythonOperator(task_id='dag_exit_status',
provide_context=True,
python_callable=final_status,
trigger_rule='all_done',
dag=dag)
def return_configured_BashOperator(task_id='give_me_a_task_id', ad_task='define_a_task',trigger_rule='all_success'):
return BashOperator(
task_id=task_id,
......@@ -69,24 +102,30 @@ def ad_etl_dag(dag_id='give_me_a_name', override_params={}):
)
ad_tasks = [
('datapresence', 'data_mining datapresence --resource_file ad_config_file.json', 'all_success'),
('normalizationpresence', 'data_mining normalizationpresence --resource_file ad_config_file.json', 'all_failed'),
('computenormalization' , 'data_mining computenormalization --resource_file ad_config_file.json', 'all_failed'),
('downloaddata', 'data_mining downloaddata --resource_file ad_config_file.json', 'one_success'),
('cachelocally', 'data_mining cachelocally --resource_file ad_config_file.json', 'all_success'),
('local_data_presence', 'data_mining datapresence --resource_file ad_config_file.json', 'all_success'),
('normalization_presence', 'data_mining normalizationpresence --resource_file ad_config_file.json', 'all_failed'),
('compute_normalization' , 'data_mining computenormalization --resource_file ad_config_file.json', 'all_failed'),
('spark_etl_to_hdfs', 'data_mining downloaddata --resource_file ad_config_file.json', 'one_success'),
('from_hdfs_to_local', 'data_mining cachelocally --resource_file ad_config_file.json', 'all_success'),
]
for atask in ad_tasks:
globals()[atask[0]] = return_configured_BashOperator(*atask)
# Start checking the data presence and in case break pipeline
# Otherwise check the normalization presence
datapresence >> normalizationpresence
# Start checking the local data presence and in case break pipeline
local_data_presence >> dag_exit_status
# Otherwise if datapresence fails, check the normalization presence
local_data_presence >> normalization_presence
# if missing the normalization compute it and then download data
normalizationpresence >> computenormalization >> downloaddata
# Otherwise start immediately downloading data
normalizationpresence >> downloaddata
normalization_presence >> compute_normalization >> spark_etl_to_hdfs
# if normalization presence succeeds start immediately downloading data
normalization_presence >> spark_etl_to_hdfs
# Finally cache data locally (#FIXME what does it mean?)
downloaddata >> cachelocally
spark_etl_to_hdfs >> from_hdfs_to_local
# computenormalization >> collect_failures
# downloaddata >> collect_failures
from_hdfs_to_local >> dag_exit_status
#collect_failures >> dag_exit_status
return dag
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment