如何从特定日期的卡夫卡群集中获取消息或数据。例如9月13日,任何人都可以为我提供代码。我用Google搜索了一下,发现只有理论,但我想要的代码从Kafka中检索基于时间戳的数据
回答
没有访问方法这一点。此外,在卡夫卡v0.10
消息没有包含任何时间戳信息之前,因此,不可能知道消息何时被写入主题。
从Kafka v0.10
开始,每条消息都包含一个元数据时间戳属性,该属性或者由生产者在消息创建时设置,或者由代理在消息插入时设置。基于时间的索引已计划,但尚未提供。因此,您需要消耗整个主题并检查时间戳字段(并忽略您不感兴趣的所有消息)。为了找到开始,您还可以对偏移和时间戳进行二进制搜索以更快地找到第一条消息。
更新:
Kakfa 0.10.1
增加了一个基于时间的索引。它允许seek
与时间戳等于或大于给定时间戳的第一条记录。您可以通过KafkaConsumer#offsetsForTime()
使用它。这将返回相应的偏移量,您可以将它们送入KafkaConsumer#seek()
。您只需使用数据并通过ConsumerRecord#timestamp()
检查记录时间戳字段以查看何时可以停止处理。
请注意,该数据是由偏移严格排序,但不是由时间戳。因此,在处理过程中,您可能会得到比您的开始时间戳小时间戳的“迟到”记录(尽管您可以简单地跳过这些记录)。
一个比较棘手的问题是在你的搜索间隔结束迟到纪录,但。在获得第一个时间戳并且搜索间隔时间较长的时间戳之后,可能还会有一些带有时间戳记的记录,这些记录稍后将成为搜索间隔的一部分(如果这些记录已附加到主题“late”)。虽然没有办法知道。因此,您可能希望继续阅读“更多”记录并检查是否存在“迟到”记录。 “一些记录”意味着多少,是您自己需要做出的设计决策。
没有一般原则,但 - 如果你对你的“写入模式”额外的知识,它可以帮助定义一个很好的策略,给你后你的搜索间隔“结束”多少条记录要消费。当然有两种默认策略:(1)停留在第一条记录的时间戳比搜索间隔更长(并且有效地忽略任何迟到的记录 - 如果使用“日志追加时间”配置,这当然是安全策略); (2)你读到日志的末尾 - 这是关于完整性的最安全的策略,但可能导致过高的开销(还要注意,随着记录可以随时附加,并且如果记录“延迟”可以是任意大的,迟到的记录甚至可能在你到达日志结束后追加)。
在实践中,这可能是一个好主意,考虑一个“最大预期延迟”和读取,直到你得到一个纪录比束缚,这延迟上更大的时间戳。
- 1. 使用Mysql从数据库中检索时间戳数据
- 2. 基于时间戳
- 3. 从sql数据库中删除。基于时间戳(cron job)?
- 4. mysql查询基于不同的键和时间戳检索行
- 5. 从数据库检索时间戳记录
- 6. 从hbase行检索时间戳
- 7. 结合基于时间戳
- 8. 基于logstash中已转换的unix时间戳值的索引
- 9. 从C#插入后从mongodb中检索不同的时间戳
- 10. 基于时区偏移量的MySql搜索时间戳
- 11. 基于时间的数据
- 12. 从火花流中的kafka消息中提取时间戳吗?
- 13. 基于时间戳R的快速数据填充
- 14. 在基于时间戳的数据流中过滤有界数据
- 15. 通过时间戳检索数据的最有效方法
- 16. 基于散列表中的值从表中检索数据
- 17. 使用Spring JDBC模板从数据库中检索时间戳值
- 18. 基于时间戳的GET查询
- 19. 基于时间戳的git rebase master
- 20. 基于max的Mysql连接(时间戳)
- 21. 更新像基于时间戳的行
- 22. 基于时间戳的grep文件
- 23. MySQL基于时间戳的IF选择
- 24. 从给定时间段检索数据
- 25. neo4j中的基于时间的数据
- 26. 检索YouTube上的时间戳评论
- 27. 检索Artifactory的BuildInfo时间戳性质
- 28. 检索收到消息的时间戳
- 29. 如何从维基数据检索结果中检索维基数据条目?
- 30. 如何从使用PHP的mysql数据库检索并显示时间戳?
*“作为卡夫卡v0.10每个消息包含元数据的时间戳属性,要么通过消息创建时间生产者设置,或通过邮件插入时间的经纪人。” * 你能解释一下如何时间戳是由制片人定义的?有没有办法让服务器始终提供时间戳? –
'ProducerRecord'对其构造函数有多个重载;有些接受时间戳参数(long类型)。对于经纪商端时间戳,您需要更改相应的主题配置文件“message.timestamp.type”,参见https://kafka.apache.org/documentation/#topicconfigs –