2016-09-30 89 views
2

我试图在Flink中创建我的第一个实时分析作业。这种方法就像kappa-architecture-like一样,所以我在Kafka上有我的原始数据,我们收到任何实体状态变化的消息。使用Flink计算流状态实体的最新状态

,以便邮件的形式为:

(id,newStatus, timestamp) 

我们要计算,为每一个时间窗口,在给定的状态的项目数。所以输出应该是这样的形式:

(outputTimestamp, state1:count1,state2:count2 ...) 

或等同物。在任何给定时间,这些行应包含处于给定状态的项目的计数,其中与Id关联的状态是针对该ID观察到的最近消息。在任何情况下都应该计算一个id的状态,即使这个事件比处理的时间要早​​。所以所有计数的总和应该等于系统中观察到的不同ID的数量。以下步骤可能会在一段时间后忘记最后一个项目中的项目,但目前这不是一个严格的要求。

这将写入elasticsearch然后查询。

我尝试了很多不同的路径,没有一个完全满足要求。使用滑动窗口,我可以很容易地实现预期的行为,除了当滑动窗口的开始超过事件的时间戳时,正如您所期望的那样,它丢失了数量。其他方法在处理积压时未能保持一致,因为我在键数据和时间戳方面做了一些技巧,这些技巧在数据一次处理完成时失败。

所以我想知道,即使在高层次,我该如何解决这个问题。它看起来像是一个相对常见的用例,但事实上,给定ID的相关信息必须无限期地保留以正确计数实体会产生很多问题。

回答

3

我想我对你的问题的解决方案:鉴于(id, state, time)一个DataStream作为

val stateUpdates: DataStream[(Long, Int, ts)] = ??? 

您获得实际状态变化如下:

val stateCntUpdates: DataStream[(Int, Int)] = s // (state, cntUpdate) 
    .keyBy(_._1) // key by id 
    .flatMap(new StateUpdater) 

StateUpdater是有状态的FlatMapFunction。它有一个键入状态,用于存储每个ID的最后一个状态。对于每个输入记录,它将返回两个状态计数更新记录:(oldState, -1)(newState, +1)。记录(oldState, -1)可确保减少先前状态的计数。

下一页您汇总每个州和窗口状态计数的变化:

val cntUpdatesPerWindow: DataStream[(Int, Int, Long)] = stateCntUpdates // (state, cntUpdate, time) 
    .keyBy(_._1) // key by state 
    .timeWindow(Time.minutes(10)) // window should be non-overlapping, e.g. Tumbling 
    .apply(new SumReducer(), new YourWindowFunction()) 

SumReducer和的cntUpdates和YourWindowFunction分配你的窗口的时间戳。此步骤汇总了窗口中每个状态的所有状态更改。

最后,我们用计数更新调整当前计数。

val stateCnts: DataStream[(Int, Int, Long)] = cntUpdatesPerWindow // (state, count, time) 
    .keyBy(_._1) // key by state again 
    .map(new CountUpdater) 

CountUpdater是一个有状态MapFunction。它有一个键控状态,用于存储每个状态的当前计数。对于每个输入记录,调整计数并发出记录(state, newCount, time)

现在你有一个每个状态都有一个新计数的流(每个状态一个记录)。如果可能,您可以使用这些记录更新您的Elasticsearch索引。如果您需要收集特定时间的所有状态计数,则可以使用窗口来完成。

请注意:此程序的状态大小取决于唯一ID的数量。这可能会导致问题,如果id空间增长非常快。

+0

我正在处理您的建议,非常感谢。我在这里错过的是'YourWindowFunction'应该做的事情。我没有活动时间的概念,所以我不能指定时间戳。此外,这个解决方案似乎正在处理时间,而我关心事件时间。我不能让它运行,但为了我所得到的,这与我所需要的稍有不同。 – Chobeat

+0

这也适用于事件时间。您需要在exec env上设置正确的'TimeCharacteristics'并指定时间戳+水印。唯一的时间依赖操作是窗口。 'YourWindowFunction'指定窗口的时间戳。 'WindowFunction.apply()'有一个'TimeWindow'参数,可以访问窗口的开始和结束时间。请参阅[文档](https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#windowfunction-with-incremental-aggregation)。 –

+0

'TimeCharacteristics'已设置,但我不知道如何为此布局分配时间戳?我应该随时更新时间戳吗?像(状态,计数,时间戳)? – Chobeat