0
我有以下简单DAG
:气流任务无法启动时,他们应该是
dag = DAG('test_parallel',
description='Simple tutorial DAG',
schedule_interval=None,
start_date=datetime(2017, 3, 20),
catchup=False)
def first_echo(arg):
print('\n\n')
print('FIRST ECHO! %s' % arg)
def second_echo(arg):
print('\n\n')
print('SECOND ECHO! %s' % arg)
def final_echo():
print('\n\n')
print('FINAL ECHO: ')
final_echo = PythonOperator(task_id='final_echo' , dag=dag, provide_context=False, python_callable=final_echo)
for i in range(5):
first_echo_op = PythonOperator(task_id='first_echo_%s' % i, python_callable=first_echo, op_args=[i], dag=dag)
second_echo_op = PythonOperator(task_id='second_echo_%s' % i, python_callable=second_echo, op_args=[i], dag=dag)
first_echo_op.set_downstream(second_echo_op)
second_echo_op.set_downstream(final_echo)
的想法是,我有一系列的五个独立的任务,每个任务导致了下面的任务,他们都得到汇总成最终任务。
问题是我的second_echo
任务都不会启动,直到所有first_echo
任务完成。由于first_echo
任务都独立和每个second_echo
任务只有依靠以前的独立first_echo
的任务,我会一直以为他们会一直只要有可用的资源这样做时才运行......
如果需要,我可以提供甘特图。
问题是:我如何在DAG运行中尽快建立独立路径,而不是等待所有第一个任务完成,假设我拥有适量的资源。