鉴于:我在卡夫卡有两个主题让我们说主题A和主题B.卡夫卡流从主题A读取记录,处理它并产生多个记录(比如说recordA和recordB)对应于消耗的记录。现在,问题是如何使用Kafka Streams来实现这一点。卡夫卡流:一个记录到多个记录
KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
@Override
public List<Message> apply(final Message message) {
return consumerRecordHandler.process(message);
}
}).*someFunction*()
这里,读取的记录是消息;处理后返回消息列表。我怎样才能将这个列表分成两个生产者流?任何帮助将不胜感激。
@ user2538255如果我的答案不清楚,请随时跟进。 –
这就是我正在做的。在搜索了Abhishek的答案之后,我登陆了这个例子https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/ confluent/examples/streams/WordCountLambdaIntegrationTest.java – user2538255
已经接受了正确的答案:)谢谢:) – user2538255