2017-06-13 398 views
6

我很努力去理解Airflow中的BranchPythonOperator是如何工作的。我知道它主要用于分支,但是文档混淆了什么要传入任务以及我需要从上游任务传递/期望什么。Airflow的BranchPythonOperator如何工作?

考虑到文档on this page中的一个简单示例,上游任务run_this_first和下游2个分支的源代码的外观如何? Airflow知道如何运行branch_a而不是branch_b?上游任务的输出在哪里被注意/读取?

回答

7

您的BranchPythonOperator是使用python_callable创建的,这是一个函数。该函数将根据您的业务逻辑返回您已连接的直接下游任务的任务名称。这可能是紧接下游的1到N个任务。没有什么是下游任务HAVE要读取,但是您可以使用xcom传递它们的元数据。

def decide_which_path(): 
    if something is True: 
     return "branch_a" 
    else: 
     return "branch_b" 


branch_task = BranchPythonOperator(
    task_id='run_this_first', 
    python_callable=decide_which_path, 
    trigger_rule="all_done", 
    dag=dag) 

branch_task.set_downstream(branch_a) 
branch_task.set_downstream(branch_b) 

设置trigger_rule或所有其余的将被跳过,因为默认是all_success是很重要的。

+0

对于trigger_rule,这仍然是真的吗?文档不建议你需要,但只是一个虚构的任务,因为其他任务(除了函数返回的那个)将立即下游跳过https://airflow.incubator.apache.org/concepts.html #分支 – Davos

+0

是的,这是正确的,所以它取决于下游任务如何连接。我认为我认为所有的分支都合并到主线任务中,但这可能甚至不是正常的用例(但这是我的正常用例)。 – Nick

+0

您的兴趣爱好是什么?我目前使用的是'这个文件是否存在',如果不存在,继续创建它,否则虚拟任务就会成功退出。它专门用于将一些静态数据(从不会更改)从SQL数据库加载到hadoop。我希望它具有幂等性,并且非常快速地禁止,如果不需要,可以完全避免对源数据库的查询影响。 – Davos