我有一个流(kafka消息正在流入主题)与flink卡夫卡消费者,我注意到一个有趣的行为,我正在寻找解决。当数据正在流入时,如果它在窗口“完成”之前停止,或者数据结束(在几个窗口之后)并且没有到达窗口的末尾,则流水线的其余部分会不触发。Flink:窗口不处理数据流末尾
实施例流程:
env.addSource(kafkaConsumer)
.flatMap(new TokenMapper())
.keyBy("word")
.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
.reduce(new CountTokens())
.flatMap(new ConvertToString())
.addSink(producer);
我使用FlinkKafkaConsumer010与TimeCharacteristic设置为EVENTTIME将env。和consumer.assignTimestampsAndWatermarks(新PeriodicWatermarks())
private static class PeriodicWatermarks implements AssignerWithPeriodicWatermarks<String>{
private long currentMaxTimestamp;
private final long maxOutOfOrderness;
public PeriodicWatermarksAuto(long maxOutOfOrderness){
this.maxOutOfOrderness = maxOutOfOrderness;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(String t, long l) {
// this should be the event timestamp
currentMaxTimestamp = l;
logger.info("TIMESTAMP: " + l);
return l;
}
}
如果我的窗口是说10秒,我的数据流只包含数据为8秒(然后停止流了一段时间),该flatMap- >接收器不处理,直到新的后数据是在流传输
实施例的数据流处理问题的方法:(每个x是一块每秒 数据)
xxxxxxxx(8secs)------(gap)--(later more data)xxxxx
^(not processed) (until I get here)^
类似地,如果例如我有35秒的价值流d ata(并且我的窗口再次是10秒)只有3个窗口值得数据触发,而剩余的5秒数据永远不会处理。
...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx
(processed) ^(not processed) (until I get here)^
最后,如果我的窗户是10秒,我只在5秒内流传输数据的flatmap->沉从未发生过。
我的问题是,有没有办法触发窗口数据处理,如果我们在一段时间后没有看到数据?
如果我的数据是实时流式传输的,我可以看到存在大量无数据,并且不希望最后一个窗口(可以说只有5秒的数据值)必须等待一些未确定的数据直到新数据进入的时候,我会希望窗口时间过后的最后一个窗口的结果。
大声思考,这似乎是由于使用事件时间而不是ProcessingTime,或者,我的水印不能正确生成最后一个窗口实际触发......不知道也许有点两者?我认为这对任何人都是一个问题,如果你的流结束了最后一位不触发。我会说我可能会发送一个结束消息msg,但是如果这个消息因为消息来源破裂而结束,这并没有帮助。
编辑:所以我改变了处理时间,它确实处理了最后一个窗口中的数据,所以我猜EventTime是罪魁祸首,我正在考虑自定义触发器或适当的窗口水印可能是答案.. 。
感谢您的帮助!
您可能希望查看向窗口添加自定义触发器的处理时间。出于这个原因,eventTime窗口经常与自定义处理时间触发器结合使用。 – Jicaar
雅这就是我的想法。我得到了它的解决方法,但我同意自定义触发器将是最好的。 –