在卡夫卡0.11中添加了Headers到记录(ProducerRecord & ConsumerRecord),在使用Kafka Streams处理主题时是否可以获取这些标题?当在KStream
调用诸如map
方法提供了key
和记录的value
的论点,但没有办法,我可以看到访问headers
。如果我们可以通过map
而不是ConsumerRecord
这将是很好的。是否可以使用Kafka Streams访问消息标题?
ex。
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ...) // can I get access to headers in methods like map, filter, aggregate, etc?
...
像这样的工作:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
为了阐明Matthias所说的话:是的,Kafka Streams中的Processor API可以访问记录元数据,如主题名称,分区号,偏移量等。 Kafka Streams中的DSL不允许您访问。但是,因为你可以将处理器API和DSL,你仍然可以编写使用DSL的'变换()'或'transformValues()'功能,它允许您通过访问记录元数据的基于DSL的流处理应用在处理器API的处理器/变换器中。 –
感谢大家提供的信息,我会密切关注元数据添加到DSL级别的情况,以便可以更新此答案。 –
@ MatthiasJ.Sax和@ MichaelG.Noll:在https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams,对于'RecordContext'建议,它似乎没有暴露头部。那是会添加的东西吗? –