2

我做了一个PCollection,作为BigQuery处理后的管道的结果,现在我想使用与管道分开的那部分数据。如何将PCollection转移到列表,以便我可以遍历它并使用内容。如何将PCollection转移到正常的列表

我从概念上做错了什么?

回答

1

一旦你完成了数据处理做了你的数据流的管道内,你很可能希望将数据写入持久性存储,如在云存储(GCS)文件,BigQuery中的表格,等

然后,您可以使用数据流之外的数据,例如将其读取到列表中。显然,它需要适应这一具体行动的记忆。

1

我会做的是创建“侧输出”(https://cloud.google.com/dataflow/model/par-do),这是您与主流程一起创建的另一个PCollection,因此最终您将拥有2个PCollections,作为您的BQ过程的结果。

只要确保在您的过程函数中创建了添加元素到侧面输出集合的条件。事情是这样的:

public final void processElement(final ProcessContext context) throws Exception { 
    context.output(bqProcessResult); 
    if (condition) { 
    context.sideOutput(myFilterTag, bqProcessResult); 
    } 
} 

这个过程的结果是不是PCollection但PCollectionTuple,所以你只需要做到以下几点:

PCollectionTuple myTuples = previous process using the function above...; 
PCollection<MyType> bqCollection = myTuples.get(bqTag); 
PCollection<MyType> filteredCollection = myTuples.get(myFilterTag);