2017-05-31 64 views
0

我试图将PCollection<String>聚合成PCollection<List<String>>,每个大约60个元素。Google Dataflow“elementCountExact”聚合

它们将被发送到一个接受每个请求60个元素的API。 目前我正在尝试通过开窗,但只有elementCountAtLeast,所以我必须将它们收集到一个列表中,并再次计数并拆分,以防它太长。这是相当繁琐,导致很多名单只有几个要素:

Repeatedly.forever(AfterFirst.of(
        AfterPane.elementCountAtLeast(maxNrOfelementsPerList), 
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))) 
      .withAllowedLateness(Duration.ZERO) 
      .discardingFiredPanes()) 
      .apply("CollectIntoLists", Combine.globally(new StringToListCombinator()).withoutDefaults()) 
      .apply("SplitListsToMaxSize", ParDo.of(new DoFn<List<String>, List<String>>() { 
       @ProcessElement 
       public void apply(ProcessContext pc) { 
        splitList(pc.element(), maxNrOfelementsPerList).forEach(pc::output); 
       } 
      })); 

有直接和更一致的方式做到这一点的聚集?

回答

1

这可以使用Dataflow 2.x中的State API来构建。

基本上,你会编写一个有状态的DoFn,它有两个状态 - 一个元素数量和一个缓冲元素的“袋子”数量。

当一个元素到达时,您将它添加到包中并递增计数。然后你检查计数,如果它是60你输出它,并清除这两个状态。由于Stateful DoFn的每个密钥都将在单台机器上运行,所以随机将这些元素分布到N个密钥中可能会很好,因此您可以扩展到N台机器(多个密钥可以在一台机器上运行) 。

+0

非常感谢,我会尽快尝试,并标记您的答案。 – Chrisport