2017-06-06 133 views
1

我有flink流,我在一些时间窗口上说几件事说30秒。闪烁流窗口触发器

这里发生了什么,它给了我结果我汇总以前的窗口以及。

说前30秒,我得到的结果10

下一个THIRY秒我要新鲜的结果,而不是我得到的最后一个窗口结果+新 等。

所以我的问题是我如何获得每个窗口的新鲜结果。

+1

你可能会给我们一个你的代码的想法。 – ImbaBalboa

回答

2

您需要使用purging触发。你想要的是FIRE_AND_PURGE(发射和删除窗口内容),默认的flink触发器是FIRE(发射和保持窗口内容)。

input 
    .keyBy(...) 
    .timeWindow(Time.seconds(30)) 

    // The important part: Replace the default non-purging ProcessingTimeTrigger 
    .trigger(new PurgingTrigger[..., TimeWindow](ProcessingTimeTrigger)) 

    .reduce(...) 

要更深入的解释看看为TriggersFIRE vs FIRE_AND_PURGE

触发器确定何时窗口(由窗口分配器形成)已准备好由窗口函数处理。每个WindowAssigner都带有一个默认触发器。如果默认触发器不符合您的需求,则可以使用触发器(...)指定自定义触发器。

触发器触发时,它可以是FIRE或FIRE_AND_PURGE。尽管FIRE 保留了窗口的内容,但FIRE_AND_PURGE 删除了其内容。默认情况下,预先实现的触发器只是简单地FIRE而不清除窗口状态。