2017-06-13 223 views
1

我可以在PythonOperator中使用宏吗?我试着跟着,但我无法得到渲染的宏!Airflow Python运算符中的宏

dag = DAG(
    'temp', 
    default_args=default_args, 
    description='temp dag', 
    schedule_interval=timedelta(days=1)) 

def temp_def(a, b, **kwargs): 
    print '{{ds}}' 
    print '{{execution_date}}' 
    print 'a=%s, b=%s, kwargs=%s' % (str(a), str(b), str(kwargs)) 

ds = '{{ ds }}' 
mm = '{{ execution_date }}' 

t1 = PythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    op_args=[mm , ds], 
    provide_context=False, 
    dag=dag) 

回答

7

仅对模板化字段进行宏处理。为了让Jinja处理这个领域,请使用您自己的扩展PythonOperator

class MyPythonOperator(PythonOperator): 
    template_fields = ('templates_dict','op_args') 

我加'templates_dict'template_fields因为PythonOperator本身具有该领域模板: PythonOperator

现在,你应该能够在该领域中使用宏:

ds = '{{ ds }}' 
mm = '{{ execution_date }}' 

t1 = MyPythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    op_args=[mm , ds], 
    provide_context=False, 
    dag=dag) 
+1

我们可以标记这是正确的答案吗?因为它是正确的答案 –

+1

为了向后兼容,你可以像这样'template_fields':'template_fields = PythonOperator.template_fields +('op_args',)''。顺便说一句,我打开了一个[JIRA来添加'op_args'和'op_kwargs'到'PythonOperator'模板字段](https://issues.apache.org/jira/browse/AIRFLOW-1814) –

1

在我意见更接近本地的Airflow方式是使用包含的PythonOperator并使用参数provide_context=True

t1 = MyPythonOperator(
    task_id='temp_task', 
    python_callable=temp_def, 
    provide_context=True, 
    dag=dag) 

现在,您可以访问所有的宏,气流元数据和任务参数中的kwargs您可调用

def temp_def(**kwargs): 
    print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date'])) 

如果你有定义params一些自定义的与您可以访问这些任务相关联以及通过kwargs['params']

+0

这可能是更好的方法正在做。我的回答主要针对宏为什么没有被处理的具体问题。 – jhnclvr