2016-11-24 170 views
7

我有两个Kafka主题将来自不同来源的完全相同的内容进行流式传输,因此如果其中一个来源发生故障,我可以获得高可用性。 我试图将2个主题合并为1个输出主题,使用Kafka Streams 0.10.1.0,这样我就不会错过关于失败的任何消息,并且在所有源都启动时没有重复。合并多个相同的Kafka Streams主题

当使用KStream的leftJoin方法时,其中一个主题可以顺利进行(次要主题),但当主要主题关闭时,不会向输出主题发送任何内容。这似乎是因为,根据Kafka Streams developer guide

KStream-KStream leftJoin总是由主气流到达记录驱动

因此,如果没有记录从主流到来,即使它们存在,也不会使用辅助流中的记录。主流重新联机后,输出恢复正常。

我也使用outerJoin(添加重复记录),接着转换到KTable和groupByKey摆脱重复的,

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1, 
    JoinWindows.of(2000L)) 

mergedStream.groupByKey() 
      .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore)) 
      .toStream((key,value) -> value) 
      .to(outputStream) 

尝试,但我还是在一段时间得到重复一次。我还使用commit.interval.ms=200来让KTable经常发送到输出流。

什么是最好的方式来处理这种合并从多个相同的输入主题完全一次输出?

+0

一般来说,我会推荐Processor API来解决这个问题。您也可以尝试切换到当前的“中继”版本(不确定这是否适用于您)。连接被重写了,它可以解决你的问题:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics新的连接语义将被包含在卡夫卡0.10.2中目标发布日期为2017年1月(https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan)。 –

+0

@ MatthiasJ.Sax我切换到了trunk,看起来'leftJoin'现在像KStream-KStream连接的'outerJoin'一样,所以我想我会回到10.1的语义。我现在正在尝试的是创建一个虚假的流,输出null值,我将它用作主要的leftJoin中的主要元素,并在与辅助元素的leftJoin中使用该合并。我希望这会导致始终在主流中存在值,即使我的主服务器已关闭(因为我只会从第一个左连接中获得空值)。 –

+0

新的'leftJoin'确实触发了双方的旧'outerJoin'也(我想这就是你的意思,“现在看起来像leftJoin现在像一个外部连接”?) - 这比旧的'leftJoin'更接近于SQL语义 - 但是'leftJoin'仍然不同于'outerJoin':如果右侧触发并且没有找到联合伙伴,它将丢弃该记录并且不发射结果。 –

回答

5

使用任何种类的连接都不能解决您的问题,因为您总是会以丢失结果(内连接以防某些流停顿)或“重复”null(左连接或外连接两个流都在线)。请参阅https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics了解有关Kafka Streams中连接语义的详细信息。

因此,我会建议使用处理器的API,你可以使用KStreamprocess()transform(),或者transformValues() DSL混合和匹配。有关更多详细信息,请参阅How to filter keys and value with a Processor using Kafka Stream DSL

您还可以将自定义存储添加到您的处理器(How to add a custom StateStore to the Kafka Streams DSL processor?)以进行重复过滤容错。

相关问题