我在写一个从Pub/Sub读取的Dataflow(Beam SDK 2.0.0),计算窗口中的元素,然后将计数作为时间序列存储在BigTable中。窗户固定时间为1分钟。云数据流窗口触发器覆盖从关闭窗口中的值
我的意图是使用触发器每秒更新当前窗口的值,以获得当前时间窗口的实时更新。
但这似乎并不奏效。该值每秒都会正确更新,但一旦Dataflow在下一分钟开始工作时,第一个更新为零。所以基本上只有我的最后一个值是正确的,其余的都是零。
Pipeline pipeline = Pipeline.create(options);
PCollection<String> live = pipeline
.apply("Read from PubSub", PubsubIO.readStrings()
.fromSubscription("projects/..."))
.apply("Window per minute",
Window
.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.orFinally(AfterWatermark.pastEndOfWindow()))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO)
);
我试着玩触发器代码,但没有任何帮助。我现在唯一的选择是删除整个.trigger
块。有没有人遇到类似的行为?