2017-08-31 50 views
0

我有一个关于goolge数据流的问题。Google dataflow 2.0 pubsub处理程序后期数据

我正在写一个数据流管道,它从PubSub读取数据,并写入BigQuery,它的工作。
现在,我必须处理晚期数据,我是继优价一些例子,但它不能正常工作,这里是我的代码:

pipeline.apply(PubsubIO.readStrings() 
      .withTimestampAttribute("timestamp").fromSubscription(Constants.SUBSCRIBER)) 
     .apply(ParDo.of(new ParseEventFn()))   
     .apply(Window.<Entity> into(FixedWindows.of(WINDOW_SIZE)) 
      // processing of late data. 
      .triggering(
        AfterWatermark 
          .pastEndOfWindow() 
          .withEarlyFirings(
            AfterProcessingTime 
              .pastFirstElementInPane() 
              .plusDelayOf(DELAY_SIZE)) 
          .withLateFirings(AfterPane.elementCountAtLeast(1))) 
      .withAllowedLateness(ALLOW_LATE_SIZE) 
      .accumulatingFiredPanes()) 
     .apply(ParDo.of(new ParseTableRow())) 
     .apply("Write to BQ", BigQueryIO.<TableRow>write()... 

这里是我的发布 - 订阅消息:

{ 
..., 
"timestamp" : "2015-08-31T09:52:25.005Z" 
} 

当我用时间戳手动推送一些消息(去PupsubTopic并发布)是< < ALLOW_LATE_SIZE,但这些消息仍然通过。

+0

我真的不明白你的问题。你能否重新说明/编辑它,以便更清楚地知道问题是什么? –

+0

对不起,如果我的解释不清楚。 – Nhjm

+0

我的问题是我在从pubsub调用数据时添加了海关时间戳,但是当我应用触发器获取具有当前事件时间约10分钟的迟到数据时,则所有数据都是在15分钟之前用时间戳手动推送的(或者之前的任何数据更多的时间)仍然收集到ParseTableRow方法中的数据列表中。请帮我解释一下,我的事情没有被解雇,或者我错过了一些处理现代数据的代码?感谢您的任何建议。 – Nhjm

回答

0

您应该使用“Duration”对象正式指定允许的迟到:.withAllowedLateness(Duration.standardMinutes(ALLOW_LATE_SIZE)),假设您已经设置了ALLOW_LATE_SIZE的值(以分钟为单位)。

您可以查看文档page获取“Google云数据流SDK for Java”,特别是“触发器”子章。

+0

嗨乔治,这是按分钟持续时间,我只是定义它是不断使用另一个数据流。 – Nhjm