2016-09-15 74 views
3

如何从特定日期的卡夫卡群集中获取消息或数据。例如9月13日,任何人都可以为我提供代码。我用Google搜索了一下,发现只有理论,但我想要的代码从Kafka中检索基于时间戳的数据

回答

10

没有访问方法这一点。此外,在卡夫卡v0.10消息没有包含任何时间戳信息之前,因此,不可能知道消息何时被写入主题。

从Kafka v0.10开始,每条消息都包含一个元数据时间戳属性,该属性或者由生产者在消息创建时设置,或者由代理在消息插入时设置。基于时间的索引已计划,但尚未提供。因此,您需要消耗整个主题并检查时间戳字段(并忽略您不感兴趣的所有消息)。为了找到开始,您还可以对偏移和时间戳进行二进制搜索以更快地找到第一条消息。

更新:

Kakfa 0.10.1增加了一个基于时间的索引。它允许seek与时间戳等于或大于给定时间戳的第一条记录。您可以通过KafkaConsumer#offsetsForTime()使用它。这将返回相应的偏移量,您可以将它们送入KafkaConsumer#seek()。您只需使用数据并通过ConsumerRecord#timestamp()检查记录时间戳字段以查看何时可以停止处理。

请注意,该数据是由偏移严格排序,但不是由时间戳。因此,在处理过程中,您可能会得到比您的开始时间戳小时间戳的“迟到”记录(尽管您可以简单地跳过这些记录)。

一个比较棘手的问题是在你的搜索间隔结束迟到纪录,但。在获得第一个时间戳并且搜索间隔时间较长的时间戳之后,可能还会有一些带有时间戳记的记录,这些记录稍后将成为搜索间隔的一部分(如果这些记录已附加到主题“late”)。虽然没有办法知道。因此,您可能希望继续阅读“更多”记录并检查是否存在“迟到”记录。 “一些记录”意味着多少,是您自己需要做出的设计决策。

没有一般原则,但 - 如果你对你的“写入模式”额外的知识,它可以帮助定义一个很好的策略,给你后你的搜索间隔“结束”多少条记录要消费。当然有两种默认策略:(1)停留在第一条记录的时间戳比搜索间隔更长(并且有效地忽略任何迟到的记录 - 如果使用“日志追加时间”配置,这当然是安全策略); (2)你读到日志的末尾 - 这是关于完整性的最安全的策略,但可能导致过高的开销(还要注意,随着记录可以随时附加,并且如果记录“延迟”可以是任意大的,迟到的记录甚至可能在你到达日志结束后追加)。

在实践中,这可能是一个好主意,考虑一个“最大预期延迟”和读取,直到你得到一个纪录比束缚,这延迟上更大的时间戳。

+0

*“作为卡夫卡v0.10每个消息包含元数据的时间戳属性,要么通过消息创建时间生产者设置,或通过邮件插入时间的经纪人。” * 你能解释一下如何时间戳是由制片人定义的?有没有办法让服务器始终提供时间戳? –

+1

'ProducerRecord'对其构造函数有多个重载;有些接受时间戳参数(long类型)。对于经纪商端时间戳,您需要更改相应的主题配置文件“message.timestamp.type”,参见https://kafka.apache.org/documentation/#topicconfigs –