我正在使用kafka-python来使用来自kafka队列(kafka版本0.10.2.0)的消息。特别是我使用KafkaConsumer类型。 如果消费者停下来,经过一段时间后重新启动,我想从最新生成的消息重新启动,即删除消费者关闭期间生成的所有消息。 我怎样才能做到这一点?kafka-python在消费者重新启动后从最后生成的消息中读取
感谢
我正在使用kafka-python来使用来自kafka队列(kafka版本0.10.2.0)的消息。特别是我使用KafkaConsumer类型。 如果消费者停下来,经过一段时间后重新启动,我想从最新生成的消息重新启动,即删除消费者关闭期间生成的所有消息。 我怎样才能做到这一点?kafka-python在消费者重新启动后从最后生成的消息中读取
感谢
你会不会seekToEnd()
到日志的结尾。
请记住,您首先需要订阅一个主题,然后才能寻找。另外,订阅是懒惰的。因此,您也需要添加一个“虚拟轮询”,然后才能查找。
consumer.subscribe(...)
consumer.poll() // dummy poll
consumer.seekToEnd()
// now enter your regular poll-loop
感谢,
它的作品!
这是我的代码的简化versione:
consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
#dummy poll
consumer.poll()
#go to end of the stream
consumer.seek_to_end()
#start iterate
for message in consumer:
print(message)
consumer.close()
The documentation指出poll()方法是与所述迭代器接口,其中我想不相容是一个我在循环结束时使用我的剧本。然而,从最初的测试来看,这段代码看起来像正常工作。
安全使用它吗?还是我误解了文献?
感谢
在回答你的问题在你的答案:
这是我的理解是,当你执行consumer.poll()
返回一本字典。所以,当我想轮询信息时,我用循环遍历字典。
consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
messages = consumer.poll()
data = []
for msg in messages:
for value in messages[msg]:
#Add just the values to the list
data.append(value[6])
我相信你在做什么,也越来越与consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
迭代器,然后走在迭代器
#start iterate
for message in consumer:
print(message)
它看起来并不像你实际上得到刚刚从调查的500个结果。您可以通过将max_poll_records=5
添加到您的KafkaConsumer配置中进行确认。然后,当您运行代码时,如果打印出超过5条消息,则可以确定您没有使用轮询功能。
希望有帮助!
这里是有一个民意调查中以列表的所有邮件的快捷方法:
while True:
messages = [] # Store all messages
crs = [] # Store all consumer records
tpd = consumer.poll(timeout_ms=60000, max_records=1)
[ crs.extend(tp) for tp in tpd.values() ] # List of cr's
[ messages.extend([json.loads(cr.value)]) for cr in crs ]
print messages
注意的是,这里消息是JSON,但一个可以跳过负载。 –