2017-10-10 254 views
0

我想在短时间内为特定主题启动kafka消费者。具体而言,我希望消费者在产生消息的特定时间延迟之后开始消费来自主题的消息。谁能说有没有任何财产或卡夫卡选项来启用它。提前致谢。Apache kafka - 消费者延迟选项

+0

只要制片人发来'Kafka'是不言而喻的消息到卡夫卡日志,并成为可供消费者消费它。我在卡夫卡文档中没有看到任何属性,它让您可以选择延迟每条消息的使用情况(如果有消息,我会很乐意进一步了解它)。但是,您可以控制消息的消耗,因此与生产者相比,您可以稍微晚点启动消费者。 – Explorer

回答

0

我们为spark-streaming做了同样的事情。我希望,这种方法也适合你。

这个想法很简单 - 使用Thread.sleep。当您收到来自kafka的新消息时,您可以计算处理它之前需要多长时间睡眠。

伪代码的想法:

message = getNextMessageFromKafka() 
sleepMs = Math.max(0, currentTime - message.timestamp) 
Thread.sleep(speepMs) 
do processing 
+0

这个想法非常适合这个问题,我觉得如果每隔1ms就会有一些数据和消费者使用这个主题并且每个1ms的数据都会睡眠,所以这可能是正确的,而另一个处理逻辑不会被称为ryte !!!! –