2016-09-23 225 views
1

我对kafka和kafka-python来说相当陌生。在安装kafka-python之后,我试着从这里简单地实现了用户代码 - http://kafka-python.readthedocs.io/en/master/usage.htmlkafka-python消费者给出的错误

我一直在用kafka的bin目录编写用户代码,并尝试从那里运行python代码。不过,我得到以下错误:

Traceback (most recent call last): File "KafkaConsumer.py", line 4, in for message in consumer: File "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py", line 559, in next return type(self).next(self) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 915, in next return next(self._iterator) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 876, in _message_generator for msg in self._fetcher: File "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py", line 559, in next return type(self).next(self) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 520, in next return next(self._iterator) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 477, in _message_generator for msg in self._unpack_message_set(tp, messages): File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 372, in _unpack_message_set inner_mset = msg.decompress() File "/usr/local/lib/python2.7/dist-packages/kafka/protocol/message.py", line 121, in decompress assert has_snappy(), 'Snappy decompression unsupported' AssertionError: Snappy decompression unsupported

这是我一直在试图运行代码:

from kafka import KafkaConsumer 
 
consumer = KafkaConsumer ('mytopic',bootstrap_servers = ['localhost:9092'], group_id='test-consumer-group') 
 
print "Consuming messages from the given topic" 
 
for message in consumer: 
 
    print("%s:%d%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))

因为,我真的很新的卡夫卡,我无法理解我做错了什么。

回答

2

您似乎错过了python-snappy,它需要读取以快速格式压缩的数据。

你需要snappysnappy-devel,您可以使用安装yum,apt-get的等 然后尝试pip install python-snappy

+0

你好,感谢您对本建议。有效。但是,现在突然间代码被卡在“从给定主题消费消息”上。我不知道为什么它不再打印数据。没有做任何改变,它突然停止工作。 –

+0

这似乎是一个单独的问题。有没有未消耗的消息留在主题中消费?您的KafkaConsumer会跟踪其消耗的消息,并且不会消耗已消耗的消息。在循环之前返回开始调用'consumer.seek_to_beginning()'。 – xli

+0

感谢您的建议。我在循环之前尝试了consumer.seek_to_beginning(),但是抛出了一个断言错误。所以,我刚刚删除了kafka主题,订阅了我正在查找的数据并再次启动了kafka。目前,这工作。我将尝试看看如何更改代码,以便从一开始就使用消息。再次感谢您的建议。 –