1
我有3个分区的话题,我想用从每一特定分区阅读下面的代码从使用Python
from kafka import KafkaConsumer, TopicPartition
brokers = 'localhost:9092'
topic = 'b3'
m = KafkaConsumer(topic, bootstrap_servers=['localhost:9092'])
par = TopicPartition(topic=topic, partition=1)
m.assign(par)
但我得到这个错误具体卡夫卡主题阅读:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
kafka.errors.IllegalStateError: IllegalStateError: You must choose only one way to configure your consumer: (1) subscribe to specific topics by name, (2) subscribe to topics matching a regex pattern, (3) assign itself specific topic-partitions.
有人可以帮助我吗?
你怎么会插入超时这里功能?如果没有例如1的新消息出现,我想停止流式传输过程。 –
。看看'poll'功能,它可以让你设置超时。 http://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html?highlight=poll#kafka.KafkaConsumer.poll –
BTW:难道我回答你原来的问题? :-) –