2017-10-18 341 views
1

我正在尝试使用ExternalTask​​Sensor并且卡在另一个DAG的任务中,该任务已成功完成。Airflow ExternalTask​​Sensor卡住

这里,第一个DAG“a”完成其任务,然后通过ExternalTask​​Sensor第二个DAG“b”应该被触发。相反,它被卡在a.first_task中。

首先DAG:

import datetime 
from airflow import DAG 
from airflow.operators.python_operator import PythonOperator 

dag = DAG(
    dag_id='a', 
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()}, 
    schedule_interval=None 
) 

def do_first_task(): 
    print('First task is done') 

PythonOperator(
    task_id='first_task', 
    python_callable=do_first_task, 
    dag=dag) 

二DAG:

import datetime 
from airflow import DAG 
from airflow.operators.python_operator import PythonOperator 
from airflow.operators.sensors import ExternalTaskSensor 

dag = DAG(
    dag_id='b', 
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()}, 
    schedule_interval=None 
) 

def do_second_task(): 
    print('Second task is done') 

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed', 
    external_dag_id='a', 
    external_task_id='first_task', 
    dag=dag) >> \ 
PythonOperator(
    task_id='second_task', 
    python_callable=do_second_task, 
    dag=dag) 

缺少什么我在这里?

回答

1

ExternalTaskSensor假定您依赖于具有相同执行日期的DAG运行中的任务。

这意味着在您的情况下,要求ab需要按照相同的计划运行(例如每天早上9:00或w/e)。

否则在实例化ExternalTaskSensor时,您需要使用execution_deltaexecution_date_fn

这是运营商自身内部的文件,以帮助进一步澄清:

:param execution_delta: time difference with the previous execution to 
    look at, the default is the same execution_date as the current task. 
    For yesterday, use [positive!] datetime.timedelta(days=1). Either 
    execution_delta or execution_date_fn can be passed to 
    ExternalTaskSensor, but not both. 

:type execution_delta: datetime.timedelta 


:param execution_date_fn: function that receives the current execution date 
    and returns the desired execution date to query. Either execution_delta 
    or execution_date_fn can be passed to ExternalTaskSensor, but not both. 

:type execution_date_fn: callable