2017-07-28 98 views
0

与此问题相关的原始代码可以在here找到。在Airflow中生成多个任务时反向上游/下游关系

我对两个bitshift运算符都感到困惑,并且set_upstream/set_downstream方法在我在DAG中定义的任务循环内工作。当DAG的主执行回路被配置如下:

for uid in dash_workers.get_id_creds(): 
    clear_tables.set_downstream(id_worker(uid)) 

for uid in dash_workers.get_id_creds(): 
    clear_tables >> id_worker(uid) 

图表看起来像这样(字母数字序列是用户ID,这也定义了任务ID ):

enter image description here

当我配置了DAG的主执行循环是这样的:

for uid in dash_workers.get_id_creds(): 
    clear_tables.set_upstream(id_worker(uid)) 

for uid in dash_workers.get_id_creds(): 
    id_worker(uid) >> clear_tables 

图如下所示:

enter image description here

第二个图是我希望/我本来期望的代码的前两个片段有什么根据我阅读的文档生成。如果我想clear_tables到触发我批的数据分析任务,针对不同的用户ID之前先执行我应该指出这是clear_tables >> id_worker(uid)

编辑 - 这里是完整的代码,因为我张贴的最后几个问题已更新,以供参考:

from datetime import datetime 
import os 
import sys 

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 

import ds_dependencies 

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH') 
if SCRIPT_PATH: 
    sys.path.insert(0, SCRIPT_PATH) 
    import dash_workers 
else: 
    print('Define DASH_PREPROC_PATH value in environmental variables') 
    sys.exit(1) 

ENV = os.environ 

default_args = { 
    'start_date': datetime.now(), 
} 

DAG = DAG(
    dag_id='dash_preproc', 
    default_args=default_args 
) 

clear_tables = PythonOperator(
    task_id='clear_tables', 
    python_callable=dash_workers.clear_db, 
    dag=DAG) 

def id_worker(uid): 
    return PythonOperator(
     task_id=id, 
     python_callable=dash_workers.main_preprocess, 
     op_args=[uid], 
     dag=DAG) 

for uid in dash_workers.get_id_creds(): 
    preproc_task = id_worker(uid) 
    clear_tables << preproc_task 

实施@ LadislavIndra的建议后,我继续有同样的逆转实现位位移操作,以获得正确的依赖关系图。

UPDATE @ AshBerlin-Taylor的回答是这里发生了什么。我认为图形视图和树视图是做同样的事情,但他们不是。下面是id_worker(uid) >> clear_tables看起来像图形视图:

enter image description here

我当然不希望在我的数据的最后一步前的准备程序将删除所有数据表!

回答

3

树状视图是“倒退”怎么你(和我!)它的第一个念头。在您的第一个屏幕截图中,它显示必须在“AAAG5608078M2”运行任务之前运行“clear_tables”。并且DAG状态取决于每个id工作者任务。因此,而不是任务订单,它是状态链的一棵树。如果这是有道理的。

(这听起来有点怪,但它是因为DAG可以分支出来,在分支回来。)

你可能有更好的运气在寻找你的DAG中的图形视图。这个有箭头并以更直观的方式显示执行顺序。 (虽然我现在发现树视图很有用,但从开始就不太清楚)

+0

@ AshBerli-Taylor - 钉上它!使用图形视图的屏幕截图发布更新。 – Aaron

1

翻看你的其他代码,看起来get_id_creds是你的任务,你试图循​​环它,这是创造一些奇怪的互动。

,将工作模式是:在气流

clear_tables = MyOperator() 

for uid in uid_list: 
    my_task = MyOperator(task_id=uid) 
    clear_tables >> my_task 
+0

谢谢@LadislavIndra,但仍然无法正常工作。我要用完整的代码更新这个问题。 – Aaron