2017-01-02 158 views

回答

0

我不知道我是否正确地理解你的问题。但是,我认为有多种方法(取决于你实际想要达到的目标,而不是从问题中得出)。

  1. 使用卡夫卡流DSL(卡夫卡0.10):使用卡夫卡流(Java流处理库),你可以指定一个窗口聚集为任何规模大小
  2. 开拓时间戳(卡夫卡0.10)的翻滚窗口:如果您要使用KafkaConsumer你可以阅读邮件,并通过间隔检查它们的时间戳,块数据
  3. 系统基于时间的(所有版本卡夫卡):刚刚从卡夫卡读取消息,并把传递消息在系统中的时间间隔基地。也就是说,在你处理下一个记录之前,你需要检查你的本地时钟来间隔地发送消息。
+0

我的意思是说,有一个选项可以每隔1分钟从卡夫卡提取数据(例如:在10:01从10:00到10:01读取所有记录,在10:02读取所有记录从10:01到10:02等),而不是在运行时获取新记录? 我想读取数据进行处理每个给定的时间间隔,而不是将数据保存在内存中,直到处理。 – user7365161

+0

由于Kafka是基于拉的,因此没有内置的支持。您需要使用建议的方法之一将此逻辑放入客户端。如果我正确理解了你的评论,则可能需要结合使用方法(3)以在开始poll()之前获取当前的日志结束偏移量,并且仅将消费者消息转换为获得的偏移量(以避免读取添加的记录开始消费后) –