0
我试图将PCollection<String>
聚合成PCollection<List<String>>
,每个大约60个元素。Google Dataflow“elementCountExact”聚合
它们将被发送到一个接受每个请求60个元素的API。 目前我正在尝试通过开窗,但只有elementCountAtLeast,所以我必须将它们收集到一个列表中,并再次计数并拆分,以防它太长。这是相当繁琐,导致很多名单只有几个要素:
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(maxNrOfelementsPerList),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1)))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply("CollectIntoLists", Combine.globally(new StringToListCombinator()).withoutDefaults())
.apply("SplitListsToMaxSize", ParDo.of(new DoFn<List<String>, List<String>>() {
@ProcessElement
public void apply(ProcessContext pc) {
splitList(pc.element(), maxNrOfelementsPerList).forEach(pc::output);
}
}));
有直接和更一致的方式做到这一点的聚集?
非常感谢,我会尽快尝试,并标记您的答案。 – Chrisport