2017-10-13 73 views
0

我试图通过Machine_1中的python脚本向Machine_2中的卡夫卡主题发送一些消息。 Machine_2Machine_1都在同一个网络中,都是Azure中的虚拟机。Python-Kafka:程序以交互模式运行,而不是以脚本模式运行

代码:sampl.py

from kafka import KafkaProducer 
Producer = KafkaProducer(bootstrap_servers=['Machine_2:9092']) 
Producer.send('test', 'hello') 

如果我运行上面的代码作为

蟒sampl.py

没有消息到达的Machine_2。但是,如果我做的:

蟒蛇-i sampl.py

然后消息到达的Machine_2。我使用kafka-console-consumer.sh进行了相同的检查。我做了yum updateMachine_1认为可能有一些图书馆在这里失踪。但没有运气。

谢谢。

+0

您正在使用什么卡夫卡Python和卡夫卡经纪人版本? –

+0

对于迟到的回复我很抱歉。卡夫卡版本 - -0.10.2.0,pytho kafka模块 - kafka_python-1.3.5。 – wonder

回答

0

这里是kafka-python的维护者。 Producer.send('test', b'hello')是异步的,不提供即时交付。你可能看到的是,在生产者有机会完成网络发送之前,python解释器正在关闭。

如果您希望在完成脚本之前等待消息发送完毕,则应该使用.get(timeout = ...)。因此,尝试:

Producer.send('test', b'hello').get(timeout=1000)

或者交替,你可以调用flush()做同样的,所有未发送的消息:

Producer.flush(timeout=1000)

+0

生产者发送消息的默认周期间隔是多少?如果在超时期限内没有发送,该怎么办? – wonder

+0

kafka-python不使用周期性间隔进行发送。数据可以在内部进行缓冲以创建更大的批次以获得更高的吞吐量,但默认情况下会禁用该数据。有关更多详细信息,请参阅文档:http://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html – dpkp

相关问题