2017-06-14 86 views
0

下一个顺序执行的顺序运行,我已经在那里我指定的需要被顺序运行三个任务DAG的文件(T1 - > T2 - > T3):确保任务(Apache的气流)

default_args = { 
    'owner': 'airflow', 
    'start_date': datetime(2017, 6, 14, 23 , 20), 
    'email_on_failure': False, 
    'email_on_retry': False, 
    } 

dag = DAG('test_dag', default_args=default_args, schedule_interval="*/5 * * * *") 

t1 = BashOperator(
    task_id='form_dataset', 
    bash_command='python 1.py', 
    dag=dag) 

t2 = BashOperator(
    task_id='form_features', 
    bash_command='python 2.py', 
    depends_on_past=True, 
    dag=dag) 

t3 = BashOperator(
    task_id='train', 
    bash_command='python 3.py', 
    depends_on_past=True, 
    dag=dag) 

t2.set_upstream(t1) 
t3.set_upstream(t2) 
t4.set_upstream(t3) 

我假设顺序行为t1-> t2-> t3是一个默认的行为,我认为情况并非如此(顺序非常随机,例如t1 - > t2 - > t2- - > T1 - > T3)。我错过了什么样的论点会纠正这种行为?

回答

1

您需要在文件的末尾添加语句

t1 >> t2 >> t3 

。有关详细信息,请参见以下链接: https://airflow.incubator.apache.org/concepts.html#bitshift-composition

为了完整起见,还可以通过对任务使用set_upstream()或set_downstream()方法来完成此操作。

+0

感谢您的回复,我确实收到了回复(抱歉,我没有将它写入代码中)。我想问题是,所有三个任务的总运行时间大于计划间隔。我想优先执行DAG中的所有任务,而不是运行新的DAG,但到目前为止没有找到相关的属性。 –

+0

每个DAG运行的执行都取决于上次运行的成功完成吗?在这种情况下,您可以查看参数depends_on_true的行为。还有另一个选项可以将每个任务分配给一个池,并将该池的大小限制为1.或者我误解了您的用例?一般来说,dag任务应尽可能独立于其他运行。 – Him

+0

在airflow.cfg中有一个设置max_active_runs_per_dag。将其设置为1还应该防止在前一个完成之前重新开始相同的dag。 – Him