2017-07-06 68 views
0

我有以下简单DAG气流任务无法启动时,他们应该是

dag = DAG('test_parallel', 
     description='Simple tutorial DAG', 
     schedule_interval=None, 
     start_date=datetime(2017, 3, 20), 
     catchup=False) 


def first_echo(arg): 
    print('\n\n') 
    print('FIRST ECHO! %s' % arg) 

def second_echo(arg): 
    print('\n\n') 
    print('SECOND ECHO! %s' % arg) 

def final_echo(): 
    print('\n\n') 
    print('FINAL ECHO: ') 

final_echo = PythonOperator(task_id='final_echo' , dag=dag, provide_context=False, python_callable=final_echo) 

for i in range(5): 
    first_echo_op = PythonOperator(task_id='first_echo_%s' % i, python_callable=first_echo, op_args=[i], dag=dag) 
    second_echo_op = PythonOperator(task_id='second_echo_%s' % i, python_callable=second_echo, op_args=[i], dag=dag) 

    first_echo_op.set_downstream(second_echo_op) 
    second_echo_op.set_downstream(final_echo) 

的想法是,我有一系列的五个独立的任务,每个任务导致了下面的任务,他们都得到汇总成最终任务。

问题是我的second_echo任务都不会启动,直到所有first_echo任务完成。由于first_echo任务都独立和每个second_echo任务只有依靠以前的独立first_echo的任务,我会一直以为他们会一直只要有可用的资源这样做时才运行......

如果需要,我可以提供甘特图。

问题是:我如何在DAG运行中尽快建立独立路径,而不是等待所有第一个任务完成,假设我拥有适量的资源。

回答

0

您的DAG正在按我的环境按预期工作。

如果我添加一个随机延迟的首要任务,迫使一些较早完成,像这样:

def first_echo(arg): 
    time.sleep(random.randint(0, 30)) 
    print('\n\n') 
    print('FIRST ECHO! %s' % arg) 
我得到预期的并行执行模式(使用LocalExecutor)

enter image description here