1

鉴于:我在卡夫卡有两个主题让我们说主题A和主题B.卡夫卡流从主题A读取记录,处理它并产生多个记录(比如说recordA和recordB)对应于消耗的记录。现在,问题是如何使用Kafka Streams来实现这一点。卡夫卡流:一个记录到多个记录

KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() { 
     @Override 
     public List<Message> apply(final Message message) { 
      return consumerRecordHandler.process(message); 
     } 
    }).*someFunction*() 

这里,读取的记录是消息;处理后返回消息列表。我怎样才能将这个列表分成两个生产者流?任何帮助将不胜感激。

回答

5

我不是如果我正确理解了这个问题,并且我也不理解@Abhishek的答案:(

如果您有输入流,并且您希望为每个输入记录获得零个,一个或多个输出记录,您将应用flatMap()flatMapValues()(具体取决于是否要修改密钥)。

您也在问“如何将此列表分成两个生产者流?”如果您的意思是将一个流拆分为多个,则可以使用branch()

欲了解更多详情,我指的是文档:http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations

+0

@ user2538255如果我的答案不清楚,请随时跟进。 –

+0

这就是我正在做的。在搜索了Abhishek的答案之后,我登陆了这个例子https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/ confluent/examples/streams/WordCountLambdaIntegrationTest.java – user2538255

+0

已经接受了正确的答案:)谢谢:) – user2538255

2

你的钥匙是什么?我猜测它不是String。执行mapValues后,您将拥有这个 - KStream<K,List<Message>>。如果K不是String然后someFunction()可以是mapK转换成String(如果是,你已经有了结果),并离开List<Message>(值)不变,因为这是你想要的最终结果

+0

是啊,这works..thanks一吨! – user2538255