2017-10-28 100 views
0

我有一个将结果写入BigQuery表的Apache Beam/Dataflow管道。然后,我想查询这个表中管道的单独部分。然而,我似乎无法弄清楚如何正确设置这个管道依赖。我编写(然后想要查询)的新表格与一个单独的表格保持连接,用于某些过滤逻辑,这就是为什么我实际上需要编写表格并运行查询。逻辑将如下所示:写完表后的Apache Beam Pipeline查询表

with beam.Pipeline(options=pipeline_options) as p: 
    table_data = p | 'CreatTable' >> # ... logic to generate table ... 

    # Write Table to BQ 
    table_written = table_data | 'WriteTempTrainDataBQ' >> beam.io.WriteToBigQuery(...) 

    query_results = table_written | 'QueryNewTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_new_table)) 

如果query_new_table实际上是一个已经存在的BQ表的查询和我改变query_results = p |,而不是table_written这个工作正常。但是,如果我试图查询我在管道中写入的表,那么我无法让管道步骤“等待”,直到实际生成表。有没有办法做到这一点,我忽略了?

当我尝试顺序执行此步骤时,出现断言错误assert isinstance(pbegin, pvalue.PBegin) AssertionError,这表示table_written是问题,因为它不是有效的PCollection实例。

有没有人知道我会代替table_written来让它实际上按照期望的顺序运行?

回答

2

当前Beam不支持用例“在BigQuery写入完成后执行某些操作”。唯一的解决方法是运行单独的管道:让您的主程序为:运行写入BigQuery的管道;等待管道完成;运行另一个从BigQuery读取的管道。

这是一个非常频繁的请求功能,我们开始设计这种支持(更一般地说,升级各种IO写入来支持排序操作),但我不知道什么时候能完成。

+0

非常有帮助!为了澄清我正确理解这一点,目前的最佳做法是将一个主程序run()与beam.Pipeline(options = pipeline_options)作为p1:'写入BQ,然后写入2 )'用beam.Pipeline(options = pipeline_options)作为p2:'从BQ中读取,还是在主程序中建议实际上有两个独立的'run()'函数? – reese0106

+0

就梁而言,无关紧要:根据你的Python风格,做一些看起来更可读的方法:) – jkff