2017-10-05 56 views
1

我写了一个Dataflow作业,当我手动运行它时效果很好。下面是相关部分(为清楚起见移除了一些验证码):数据流模板中的动态bigquery查询

parser.add_argument('--end_datetime', 
        dest='end_datetime') 
known_args, pipeline_args = parser.parse_known_args(argv) 

query = <redacted SQL String with a placeholder for a date> 
query = query.replace('#ENDDATETIME#', known_args.end_datetime) 

with beam.Pipeline(options=pipeline_options) as p: 
    rows = p | 'read query' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)) 

现在我想创建一个模板,并安排其与动态ENDDATETIME定期运行。据我了解,为了做到这一点,我需要改变add_argument按照本文档add_value_provider_argument:

https://cloud.google.com/dataflow/docs/templates/creating-templates

遗憾的是,似乎ValueProvider值不可用,当我需要他们,他们是唯一可用在管道内部。 (如果我在这里错了,请纠正我......)。所以我有点卡住了。

有没有人有关于如何在数据流模板中获取动态日期到我的查询中的任何指针?

回答

4

Python目前仅支持FileBasedSource IO的ValueProvider选项。您可以通过单击您使用的链接上的Python选项卡来看到: https://cloud.google.com/dataflow/docs/templates/creating-templates

在“流水线I/O和运行时参数”部分下。

+0

是的,但ValueProvider可以在其他非IO方法中使用。例如,该页面上的“在您的函数中使用ValueProvider”示例在ParDo中使用ValueProvider值。所以解决这个问题的一种方法是修改DoFn中的查询变量,但是我没有看到一种方式将修改后的变量返回到主程序,以便可以在随后的管道步骤中使用。 –

+0

与Java中发生的不同,Python中的BigQuery不使用自定义源。换句话说,它没有在SDK中完全实现,但是也包含了后端的部分。目前,只有本地(不是自定义源)可以使用模板。有计划将BigQuery添加为自定义。 –

+0

我的意思是只有自定义源可以使用模板。 (其余是正确的)。我的意思是指定“本地”与“自定义”相反,所以本机来源不能使用模板(Python BigQuery的当前案例)。以下是将BigQuery作为自定义源添加的计划:https://issues.apache.org/jira/browse/BEAM-1440 –