2016-11-25 42 views
3

我正在开发一个包含Kafka流的PoC。现在我需要获取流消费者中的偏移值,并使用它为每条消息生成一个唯一的密钥(topic-offset)->hash。原因是:生产者是系统日志,只有少数人拥有ID。我无法在消费者中生成UUID,因为在重新处理的情况下,我需要重新生成相同的密钥。如何获得KStream(汇合平台)中的偏移值

我的问题是:org.apache.kafka.streams.processor.ProcessorContext类暴露了.offset()返回值的方法,但我使用KStream而不是处理器,并且找不到返回相同内容的方法。

任何人都知道如何从Kstream中提取每行的消费者价值? 在此先感谢

+0

另请参阅:http://stackoverflow.com/questions/40814437/how-to-filter-keys-and-value-with-a-processor-using-kafka-stream-dsl –

回答

3

您可以通过process(...),transform(...)transformValues(...)使用混合匹配DSL和处理器API。

它允许您访问类似于普通处理器API的当前记录偏移量。在你的情况下,似乎你想使用KStream#transform(...)

+0

另见:http:/ /stackoverflow.com/questions/40814437/how-to-filter-keys-and-value-with-a-processor-using-kafka-stream-dsl –

+0

即使使用Processor API,我们只能访问键和值,而不能到ConsumerRecord,只有时间戳提取器似乎有ConsumerRecord。你可以添加关于访问partitionId,时间戳,偏移量和其他字段的更多细节。 –

+0

正如问题中所提到的,Processor API通过'Processor#init(...)'方法提供了'ProcessorContext'对象。 'ProcessorContext'在每次调用'process()'之前都会更新,其中包含将要处理的下一条记录的元数据。因此,当调用process()时,你可以通过调用相应的'ProcessorContext'方法来获得记录偏移等。只需在一个类成员变量中使用提供的'ProcessorContext'在'init()'中进行初始化即可。 –