0
我已经创建了以下具有动态添加任务的工作流程。但Airflow无法将Task
,Join
和QT
节点添加到Dag。在图形表示中,我只能看到START
和END
节点。有什么我在这里做错了。Airflow动态任务创建问题
感谢。
dag = DAG(
'ddl_ver1',
default_args=default_args,
schedule_interval='*/5 * * * *'
)
start_node = DummyOperator(task_id='ddl_start',
dag=dag)
end_node = DummyOperator(task_id='ddl_finish',
dag=dag)
def create_qts(account_id):
qts = []
for i in range(7):
qt = DummyOperator(task_id='ddl_qt_' + str(account_id) + "_" + str(i),
dag=dag)
qts.append(query)
return qts
def create_data_discovery_tasks(accounts):
for account_id in accounts:
task = DummyOperator(
task_id='ddl_task_' + str(account_id),
dag=dag)
join = DummyOperator(
task_id='ddl_join_' + str(account_id),
dag=dag)
qts = create_qts(account_id)
for qt in qts:
qt.set_upstream(task)
qt.set_downstream(join)
task.set_upstream(START)
join.set_downstream(END)