2017-06-15 66 views
0

所以,我有两个不同的KStream就像这样:加入两种不同KStreams

流1:(String键,Object值1)

流2:(String键,Object值2)

我想加入他们,这样我就会看到一个类似于(Object value1,Object value2)的流。

干净的方法是什么?

+0

有几个连接类型,取决于你想要达到的目标。这是在信息文章,可能会帮助你进一步:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics – jvwilge

回答

1

工作的一种方式是加入两个流,使得生成的流的值是包含两个原始值的容器类。然后,映射流以将值从容器中取出并将其中一个用作关键字。

代码:

KStream<String, Object> stream1; 
KStream<String, Object> stream2; 

KStream<Object, Object> joinedStream = stream1 
     .join(stream2, (value1, value2) -> new MyValueContainer(value1, value2)) 
     .map((key, container) -> new KeyValue<Object, Object>(container.getValue1(), container.getValue2())); 
+0

在连接我认为你的意思是写新的MyValueContainer(value1,value2 )。 –

+0

@MichalBorowiecki我的意思是这样做 - 谢谢指出 –