2017-04-05 241 views
0

我已经创建了以下具有动态添加任务的工作流程。但Airflow无法将Task,JoinQT节点添加到Dag。在图形表示中,我只能看到STARTEND节点。有什么我在这里做错了。Airflow动态任务创建问题

感谢。

dag = DAG(
    'ddl_ver1', 
    default_args=default_args, 
    schedule_interval='*/5 * * * *' 
) 


start_node = DummyOperator(task_id='ddl_start', 
         dag=dag) 


end_node = DummyOperator(task_id='ddl_finish', 
         dag=dag) 


def create_qts(account_id): 
    qts = [] 
    for i in range(7): 
     qt = DummyOperator(task_id='ddl_qt_' + str(account_id) + "_" + str(i), 
         dag=dag) 
     qts.append(query) 

    return qts 



def create_data_discovery_tasks(accounts): 
    for account_id in accounts: 
     task = DummyOperator(
      task_id='ddl_task_' + str(account_id), 
      dag=dag) 

     join = DummyOperator(
      task_id='ddl_join_' + str(account_id), 
      dag=dag) 

     qts = create_qts(account_id) 

     for qt in qts: 
      qt.set_upstream(task) 
      qt.set_downstream(join) 

     task.set_upstream(START) 

     join.set_downstream(END) 

回答

0

create_qts从来没有真正被称为它的方式。

如果您想在开始时启动,则需要调用set_downstreamset_upstream,将生成的任务置于中间,然后结束。

这将正常工作(注意最后一行):

from airflow.models import DAG 
from airflow.operators.dummy_operator import DummyOperator 
from datetime import datetime 

dag = DAG(
    'ddl_ver1', 
    schedule_interval='*/5 * * * *', 
    start_date=datetime(2017,4,30) 
) 


start_node = DummyOperator(task_id='ddl_start', 
         dag=dag) 


end_node = DummyOperator(task_id='ddl_finish', 
         dag=dag) 


def create_qts(account_id): 
    previous_qt = None 
    for i in range(7): 

     qt = DummyOperator(task_id='ddl_qt_' + str(account_id) + "_" + str(i), 
         dag=dag) 

     if previous_qt: 
      previous_qt.set_downstream(qt) 
     else: 
      start_node.set_downstream(qt) 

     previous_qt = qt 

    qt.set_downstream(end_node) 

create_qts(123)