2017-10-13 211 views
2

在卡夫卡0.11中添加了Headers到记录(ProducerRecord & ConsumerRecord),在使用Kafka Streams处理主题时是否可以获取这些标题?当在KStream调用诸如map方法提供了key和记录的value的论点,但没有办法,我可以看到访问headers。如果我们可以通过map而不是ConsumerRecord这将是很好的。是否可以使用Kafka Streams访问消息标题?

ex。

KStreamBuilder kStreamBuilder = new KStreamBuilder(); 
KStream<String, String> stream = kStreamBuilder.stream("some-topic"); 
stream 
    .map((key, value) -> ...) // can I get access to headers in methods like map, filter, aggregate, etc? 
    ... 

像这样的工作:

KStreamBuilder kStreamBuilder = new KStreamBuilder(); 
KStream<String, String> stream = kStreamBuilder.stream("some-topic"); 
stream 
    .map((record) -> { 
     record.headers(); 
     record.key(); 
     record.value(); 
    }) 
    ... 

回答

5

记录头是目前不流API进行访问。有一个JIRA增加,虽然这一功能:通过https://issues.apache.org/jira/browse/KAFKA-5632

在当前卡夫卡(0.11和1.0,这将是释放不久)就可以通过处理器API访问记录元数据(即,通过transform()transformValues(),或process())给出“上下文”对象。它公开主题,分区,偏移量和时间戳。 Cf https://docs.confluent.io/current/streams/developer-guide.html#applying-processors-and-transformers-processor-api-integration

元数据在DSL级别上不可用。但是,还有一些工作正在进行中,以扩展DSL:https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams

+0

为了阐明Matthias所说的话:是的,Kafka Streams中的Processor API可以访问记录元数据,如主题名称,分区号,偏移量等。 Kafka Streams中的DSL不允许您访问。但是,因为你可以将处理器API和DSL,你仍然可以编写使用DSL的'变换()'或'transformValues()'功能,它允许您通过访问记录元数据的基于DSL的流处理应用在处理器API的处理器/变换器中。 –

+0

感谢大家提供的信息,我会密切关注元数据添加到DSL级别的情况,以便可以更新此答案。 –

+0

@ MatthiasJ.Sax和@ MichaelG.Noll:在https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams,对于'RecordContext'建议,它似乎没有暴露头部。那是会添加的东西吗? –

相关问题