2017-09-26 604 views
3

我有一个流(kafka消息正在流入主题)与flink卡夫卡消费者,我注意到一个有趣的行为,我正在寻找解决。当数据正在流入时,如果它在窗口“完成”之前停止,或者数据结束(在几个窗口之后)并且没有到达窗口的末尾,则流水线的其余部分会不触发。Flink:窗口不处理数据流末尾

实施例流程:

env.addSource(kafkaConsumer) 
     .flatMap(new TokenMapper()) 
     .keyBy("word") 
     .window(TumblingEventTimeWindows.of(Time.seconds(10L))) 
     .reduce(new CountTokens()) 
     .flatMap(new ConvertToString()) 
     .addSink(producer); 

我使用FlinkKafkaConsumer010与TimeCharacteristic设置为EVENTTIME将env。和consumer.assignTimestampsAndWatermarks(新PeriodicWatermarks())

private static class PeriodicWatermarks implements AssignerWithPeriodicWatermarks<String>{ 

    private long currentMaxTimestamp; 
    private final long maxOutOfOrderness; 

    public PeriodicWatermarksAuto(long maxOutOfOrderness){ 
     this.maxOutOfOrderness = maxOutOfOrderness; 
    } 

    @Override 
    public Watermark getCurrentWatermark() { 
     return new Watermark(currentMaxTimestamp - maxOutOfOrderness); 
    } 

    @Override 
    public long extractTimestamp(String t, long l) { 
     // this should be the event timestamp 
     currentMaxTimestamp = l; 
     logger.info("TIMESTAMP: " + l); 
     return l; 
    } 
} 

如果我的窗口是说10秒,我的数据流只包含数据为8秒(然后停止流了一段时间),该flatMap- >接收器不处理,直到新的后数据是在流传输

实施例的数据流处理问题的方法:(每个x是一块每秒 数据)

 xxxxxxxx(8secs)------(gap)--(later more data)xxxxx 
     ^(not processed)   (until I get here)^ 

类似地,如果例如我有35秒的价值流d ata(并且我的窗口再次是10秒)只有3个窗口值得数据触发,而剩余的5秒数据永远不会处理。

 ...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx 
     (processed)  ^(not processed)   (until I get here)^ 

最后,如果我的窗户是10秒,我只在5秒内流传输数据的flatmap->沉从未发生过。

我的问题是,有没有办法触发窗口数据处理,如果我们在一段时间后没有看到数据?

如果我的数据是实时流式传输的,我可以看到存在大量无数据,并且不希望最后一个窗口(可以说只有5秒的数据值)必须等待一些未确定的数据直到新数据进入的时候,我会希望窗口时间过后的最后一个窗口的结果。

大声思考,这似乎是由于使用事件时间而不是ProcessingTime,或者,我的水印不能正确生成最后一个窗口实际触发......不知道也许有点两者?我认为这对任何人都是一个问题,如果你的流结束了最后一位不触发。我会说我可能会发送一个结束消息msg,但是如果这个消息因为消息来源破裂而结束,这并没有帮助。

编辑:所以我改变了处理时间,它确实处理了最后一个窗口中的数据,所以我猜EventTime是罪魁祸首,我正在考虑自定义触发器或适当的窗口水印可能是答案.. 。

感谢您的帮助!

回答

2

我会留下来为后人留下问题,因为我认为这与水印有关。 timestamps和watermaker(来自assignTimestampsAndWatermarks)调用'getCurrentWatermark()',并且由于我将传入实体的水印设置为固定数字(它们的时间戳 - 最大偏移量),它不会更新,直到它看到新的实体。

我的解决方案是某种计时器,如果在可配置的时间内没有看到数据,最终会将水印提前到下一个窗口。我将无法处理非常隐藏的数据,但我不认为这应该是一个问题。这是EventTime处理的预期行为。

+0

您可能希望查看向窗口添加自定义触发器的处理时间。出于这个原因,eventTime窗口经常与自定义处理时间触发器结合使用。 – Jicaar

+0

雅这就是我的想法。我得到了它的解决方法,但我同意自定义触发器将是最好的。 –