2017-03-08 85 views
3

很多时候,我下载的文件在文件名中有一个日期。在Airflow中使用参数的示例?

csat_surveys_2017_03_05.csv 
03062017_roster.csv 

我的代码单独处理这个问题。

  • 比较在处理的文件清单,其中应该存在的预期日期的日期(根据切片的文件名)(某些日期范围,直到当前日期)
  • 对于每一个我处理文件,添加文件名数据库表和只处理还没有被添加到表

我可以(也应该I)使用气流计划日期,以取代其编写这个逻辑需要新的文件吗?每天,我的任务都会按计划进行。我把这个计划的日期(可能减去1天),并将该值作为参数传递,作为要读取的文件名的一部分(在pandas中)。如果是这样,我可以请看一个我可以用作模板的清晰示例吗?

这是一个更好的方法,并且如果文件丢失或延迟了几天会覆盖我(我希望任务失败,然后继续尝试每一天,直到它成功或直到我注意到它并可以向客户提出问题)?

回答

0

我会说是的,使用execution_date可能是最佳做法。

要访问它,您需要一个模板字段。一些默认的运营商有那些已经,或您可能希望创建自己的操作,那么这将是这个样子:

在你DAG,你有任务为:

my_task = MyOperator(
    task_id='t1', 
    filename='prefix_{{ ds }}_suffix') 

ds是气流宏用于访问execution_date参数作为日期的字符串表示形式。

和你MyOperator会是什么样子:

class MyOperator(BaseOperator): 
    template_fields = ('filename') 

    def __init__(self, filename) 
     self.filename = filename 

    def execute(self, context): 
     download_file(self.filename) 
     do_other_stuff() 

你可以找到更多有关如何在宏部分参数化任务https://airflow.incubator.apache.org/code.html#macros