2017-04-11 41 views
0

我与过去的赛事时间Datastream的 - TimeWindow为过去的日期

01-12-2015 01:10:10 
01-12-2015 01:10:20 
01-12-2015 01:10:30 
01-12-2015 01:10:40 
.... (millions of records) 

我想申请timeWindow这个timeWindow(Time.seconds(30))

我可以有一个TimeExtractor类来获取数据的EventTime数据集。但我如何实施getCurrentWatermark方法。它应该得到过去的日期和时间

回答

1

在你的情况下,最好使用提供的TimeStampAssigners之一见here

所以,我建议是这样的:

DataStream<MyEvent> stream = ... 

DataStream<MyEvent> withTimestampsAndWatermarks = 
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() { 

    @Override 
    public long extractAscendingTimestamp(MyEvent element) { 
     return element.getCreationTime(); 
    } 
}); 

还记得设置适当的TimestampCharacteristic

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
+0

舷窗........ – madhairsilence