2017-04-05 199 views

回答

0

你会不会seekToEnd()到日志的结尾。

请记住,您首先需要订阅一个主题,然后才能寻找。另外,订阅是懒惰的。因此,您也需要添加一个“虚拟轮询”,然后才能查找。

consumer.subscribe(...) 
consumer.poll() // dummy poll 
consumer.seekToEnd() 

// now enter your regular poll-loop 
1

感谢,

它的作品!

这是我的代码的简化versione:

consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True) 
#dummy poll 
consumer.poll() 
#go to end of the stream 
consumer.seek_to_end() 
#start iterate 
for message in consumer: 
    print(message) 

consumer.close() 

The documentation指出poll()方法是与所述迭代器接口,其中我想不相容是一个我在循环结束时使用我的剧本。然而,从最初的测试来看,这段代码看起来像正常工作。

安全使用它吗?还是我误解了文献?

感谢

0

在回答你的问题在你的答案:

这是我的理解是,当你执行consumer.poll()返回一本字典。所以,当我想轮询信息时,我用循环遍历字典。

consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True) 
messages = consumer.poll() 
data = [] 
for msg in messages: 
    for value in messages[msg]: 
     #Add just the values to the list 
     data.append(value[6]) 

我相信你在做什么,也越来越与consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)迭代器,然后走在迭代器

#start iterate 
for message in consumer: 
    print(message) 

它看起来并不像你实际上得到刚刚从调查的500个结果。您可以通过将max_poll_records=5添加到您的KafkaConsumer配置中进行确认。然后,当您运行代码时,如果打印出超过5条消息,则可以确定您没有使用轮询功能。

希望有帮助!

0

这里是有一个民意调查中以列表的所有邮件的快捷方法:

while True: 
    messages = [] # Store all messages 
    crs = [] # Store all consumer records 
    tpd = consumer.poll(timeout_ms=60000, max_records=1) 
    [ crs.extend(tp) for tp in tpd.values() ] # List of cr's 
    [ messages.extend([json.loads(cr.value)]) for cr in crs ] 
    print messages 
+0

注意的是,这里消息是JSON,但一个可以跳过负载。 –

相关问题