0

我在写一个从Pub/Sub读取的Dataflow(Beam SDK 2.0.0),计算窗口中的元素,然后将计数作为时间序列存储在BigTable中。窗户固定时间为1分钟。云数据流窗口触发器覆盖从关闭窗口中的值

我的意图是使用触发器每秒更新当前窗口的值,以获得当前时间窗口的实时更新。

但这似乎并不奏效。该值每秒都会正确更新,但一旦Dataflow在下一分钟开始工作时,第一个更新为零。所以基本上只有我的最后一个值是正确的,其余的都是零。

Pipeline pipeline = Pipeline.create(options); 

PCollection<String> live = pipeline 
     .apply("Read from PubSub", PubsubIO.readStrings() 
     .fromSubscription("projects/...")) 
     .apply("Window per minute", 
      Window 
       .<String>into(FixedWindows.of(Duration.standardMinutes(1))) 
       .triggering(Repeatedly 
        .forever(AfterProcessingTime 
         .pastFirstElementInPane() 
         .plusDelayOf(Duration.standardSeconds(1)))           
        .orFinally(AfterWatermark.pastEndOfWindow())) 
       .accumulatingFiredPanes() 
       .withAllowedLateness(Duration.ZERO) 
      ); 

我试着玩触发器代码,但没有任何帮助。我现在唯一的选择是删除整个.trigger块。有没有人遇到类似的行为?

回答

0

向Google报告我的问题后,他们在Beam SDK中发现了一些导致此问题的问题。这些链接的更多细节:

当酸性氧化电位水和GC定时器火起来(非零允许迟到),我们不能不注意到,这是最后的窗格:如果他们与https://issues.apache.org/jira/browse/BEAM-2505

处理时间计时器无法正常忽视所述GC定时器:https://issues.apache.org/jira/browse/BEAM-2502

处理时间定时器只是解释为GC定时器,完全从不同时间域不正确地比较时间戳:https://issues.apache.org/jira/browse/BEAM-2504