2017-02-10 116 views
1

我们有以下问题需要与卡夫卡流解决:卡夫卡流DSL:聚合,充实和发送通过

1-获取消息。每条消息都标有eventId(消息更新事件)和correlationId(每条消息都是唯一的)。

2-合计从该消息的一些国家(基于事件ID),并追加它在本地存储

3-现有的状态充实该消息为充分凝聚状态该事件,并通过发送到输出话题

要点是我们不能真的失去一条消息,并且它必须一直以最新的聚合状态(我们在消息处理期间我们实际评估的)来丰富传入的消息。

从我迄今所看到的,我们不能只用简单的聚合(类似的东西:)

stateMessageStream 
    .map((k, v) => new KeyValue[String, StateMessage](k, v)) 
    .mapValues[StateMessageWithMarkets](sm => {StateMessageWithMarkets(Some(sm), extract(sm))}) 
    .groupBy((k, _) => k, stringSerde, marketAggregatorSerde) 
    .aggregate[StateMessageWithMarkets](() => StateMessageWithMarkets(), (_, v, aggregatedState) => aggregatedState.updateModelMarketsWith(v), marketAggregatorSerde, kafkaStoreName) 
    .to(stringSerde, marketAggregatorSerde, kafkaOutTopic) 

,因为只有聚集在间隔产生新的记录,这将意味着有两个传入的消息,我们不妨生成只有单个聚合输出消息(所以我们失去了一条消息)

我第二次尝试如何实现这基本上是两个流,一个用于聚合,第二个用于普通消息。最后,我们可以加入两个流回到一起使用连接操作的基础上的correlationID作为重点 - 我们可以用正确的信息传达给适当的状态:

val aggregatedStream : KStream[String, MarketAggregator] = stateMessageStream 
    .map((k, v) => new KeyValue[String, StateMessage](k, v)) 
    .mapValues[StateMessage](v => { 
    log.debug("Received State Message, gameId: " + v.metadata().gtpId() + ", correlationId: " + v.correlationId) 
    v}) 
    .mapValues[MarketAggregator](sm => {MarketAggregator(sm.correlationId, extract(sm))}) 
    .groupBy((k, v) => k, stringSerde, marketAggregatorSerde) 
    .aggregate[MarketAggregator](() => MarketAggregator(), (_, v, aggregatedState) => aggregatedState.updateModelMarketsWith(v), marketAggregatorSerde, kafkaStoreName) 
    .toStream((k, v) => v.correlationId) 

stateMessageStream 
    .selectKey[String]((k, v) => v.correlationId) 
    .leftJoin[MarketAggregator, StateMessageWithMarkets](aggregatedStream, (stateMessage : StateMessage, aggregatedState : MarketAggregator) => StateMessageWithMarkets(Some(stateMessage), aggregatedState.modelMarkets, stateMessage.correlationId), 
     JoinWindows.of(10000), 
     stringSerde, stateMessageSerde, marketAggregatorSerde) 
    .mapValues[StateMessageWithMarkets](v => { 
     log.debug("Producing aggregated State Message, gameId: " + v.stateMessage.map(_.metadata().gtpId()).getOrElse("unknown") + 
      ", correlationId: " + v.stateMessage.map(_.correlationId).getOrElse("unknown")) 
      v 
     }) 
    .to(stringSerde, stateMessageWithMarketsSerde, kafkaOutTopic) 

然而,这似乎并没有被任何工作 - 对于两个传入的消息,我仍然只在输出主题上获得最新聚合状态的单个消息。

有人可以解释为什么以及正确的解决方案是什么?

回答