2017-06-18 108 views
1

我想分享这个流:1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,5,。 ..这些会话:如何与Apache Flink会话流?

1,1,1 
2,2,2,2,2 
3,3,3,3,3,3,3 
0 
3,3,3 
5 

我已经写CustomTrigger检测何时流元素从1到2改变(2到3,3到0等),然后触发触发器。但这不是解决方案,因为当我处理2的第一个元素并触发触发器时,窗口将是[1,1,1,2],但我需要触发1的最后一个元素上的触发器。

这里是我的onElement功能04-0030-03在我的自定义触发类:

override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = { 
    if (prevState == element.value) { 
     prevState = element.value 
     TriggerResult.CONTINUE 
    } else { 
     prevState = element.value 
     TriggerResult.FIRE 
    } 
} 

我怎样才能解决这个问题?

回答

2

我认为FlatMapFunctionListState是实现此用例的最简单方法。

当新元素到达时(即调用flatMap()方法),您检查值是否更改。如果该值没有更改,则将该元素附加到该状态。如果该值发生更改,则会将当前列表状态作为会话发送,请清除该列表,然后将新元素作为第一个列表状态插入。

但是,您应该记住,这假定元素的顺序被保留。 Flink确保在一个分区内,即只要元素不被混洗,并且所有操作员以相同的并行性运行。