我对kafka和kafka-python来说相当陌生。在安装kafka-python之后,我试着从这里简单地实现了用户代码 - http://kafka-python.readthedocs.io/en/master/usage.htmlkafka-python消费者给出的错误
我一直在用kafka的bin目录编写用户代码,并尝试从那里运行python代码。不过,我得到以下错误:
Traceback (most recent call last): File "KafkaConsumer.py", line 4, in for message in consumer: File "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py", line 559, in next return type(self).next(self) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 915, in next return next(self._iterator) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 876, in _message_generator for msg in self._fetcher: File "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py", line 559, in next return type(self).next(self) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 520, in next return next(self._iterator) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 477, in _message_generator for msg in self._unpack_message_set(tp, messages): File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 372, in _unpack_message_set inner_mset = msg.decompress() File "/usr/local/lib/python2.7/dist-packages/kafka/protocol/message.py", line 121, in decompress assert has_snappy(), 'Snappy decompression unsupported' AssertionError: Snappy decompression unsupported
这是我一直在试图运行代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer ('mytopic',bootstrap_servers = ['localhost:9092'], group_id='test-consumer-group')
print "Consuming messages from the given topic"
for message in consumer:
print("%s:%d%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
因为,我真的很新的卡夫卡,我无法理解我做错了什么。
你好,感谢您对本建议。有效。但是,现在突然间代码被卡在“从给定主题消费消息”上。我不知道为什么它不再打印数据。没有做任何改变,它突然停止工作。 –
这似乎是一个单独的问题。有没有未消耗的消息留在主题中消费?您的KafkaConsumer会跟踪其消耗的消息,并且不会消耗已消耗的消息。在循环之前返回开始调用'consumer.seek_to_beginning()'。 – xli
感谢您的建议。我在循环之前尝试了consumer.seek_to_beginning(),但是抛出了一个断言错误。所以,我刚刚删除了kafka主题,订阅了我正在查找的数据并再次启动了kafka。目前,这工作。我将尝试看看如何更改代码,以便从一开始就使用消息。再次感谢您的建议。 –