所以,我有两个不同的KStream
就像这样:加入两种不同KStreams
流1:(String
键,Object
值1)
流2:(String
键,Object
值2)
我想加入他们,这样我就会看到一个类似于(Object
value1,Object
value2)的流。
干净的方法是什么?
所以,我有两个不同的KStream
就像这样:加入两种不同KStreams
流1:(String
键,Object
值1)
流2:(String
键,Object
值2)
我想加入他们,这样我就会看到一个类似于(Object
value1,Object
value2)的流。
干净的方法是什么?
工作的一种方式是加入两个流,使得生成的流的值是包含两个原始值的容器类。然后,映射流以将值从容器中取出并将其中一个用作关键字。
代码:
KStream<String, Object> stream1;
KStream<String, Object> stream2;
KStream<Object, Object> joinedStream = stream1
.join(stream2, (value1, value2) -> new MyValueContainer(value1, value2))
.map((key, container) -> new KeyValue<Object, Object>(container.getValue1(), container.getValue2()));
在连接我认为你的意思是写新的MyValueContainer(value1,value2 )。 –
@MichalBorowiecki我的意思是这样做 - 谢谢指出 –
有几个连接类型,取决于你想要达到的目标。这是在信息文章,可能会帮助你进一步:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics – jvwilge