与此问题相关的原始代码可以在here找到。在Airflow中生成多个任务时反向上游/下游关系
我对两个bitshift运算符都感到困惑,并且set_upstream
/set_downstream
方法在我在DAG中定义的任务循环内工作。当DAG的主执行回路被配置如下:
for uid in dash_workers.get_id_creds():
clear_tables.set_downstream(id_worker(uid))
或
for uid in dash_workers.get_id_creds():
clear_tables >> id_worker(uid)
图表看起来像这样(字母数字序列是用户ID,这也定义了任务ID ):
当我配置了DAG的主执行循环是这样的:
for uid in dash_workers.get_id_creds():
clear_tables.set_upstream(id_worker(uid))
或
for uid in dash_workers.get_id_creds():
id_worker(uid) >> clear_tables
图如下所示:
第二个图是我希望/我本来期望的代码的前两个片段有什么根据我阅读的文档生成。如果我想clear_tables
到触发我批的数据分析任务,针对不同的用户ID之前先执行我应该指出这是clear_tables >> id_worker(uid)
编辑 - 这里是完整的代码,因为我张贴的最后几个问题已更新,以供参考:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
ENV = os.environ
default_args = {
'start_date': datetime.now(),
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
clear_tables = PythonOperator(
task_id='clear_tables',
python_callable=dash_workers.clear_db,
dag=DAG)
def id_worker(uid):
return PythonOperator(
task_id=id,
python_callable=dash_workers.main_preprocess,
op_args=[uid],
dag=DAG)
for uid in dash_workers.get_id_creds():
preproc_task = id_worker(uid)
clear_tables << preproc_task
实施@ LadislavIndra的建议后,我继续有同样的逆转实现位位移操作,以获得正确的依赖关系图。
UPDATE @ AshBerlin-Taylor的回答是这里发生了什么。我认为图形视图和树视图是做同样的事情,但他们不是。下面是id_worker(uid) >> clear_tables
看起来像图形视图:
我当然不希望在我的数据的最后一步前的准备程序将删除所有数据表!
@ AshBerli-Taylor - 钉上它!使用图形视图的屏幕截图发布更新。 – Aaron