2

我对 kafka 和 kafka-python 相当陌生。安装 kafka-python 后,我尝试从这里简单地实现消费者代码 - http://kafka-python.readthedocs.io/en/master/usage.html

我一直在 kafka 的 bin 目录中编写消费者代码,并尝试从那里运行 python 代码。但是我收到以下错误:

Traceback(最近一次调用最后一次):文件“KafkaConsumer.py”,第 4 行,用于消费者中的消息:文件“/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py”,第 559 行,在下一个返回类型(self)中。下一个(自我)文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”,第 915 行,在下一个 返回下一个(self._iterator)文件“/usr/local/lib /python2.7/dist-packages/kafka/consumer/group.py”,第 876 行,在 _message_generator for msg in self._fetcher:文件“/usr/local/lib/python2.7/dist-packages/kafka/vendor /six.py”,第 559 行,在下一个返回类型(self)中。下一个(自我)文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py”,第 520 行, 返回下一个(self._iterator)文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py”,第477行,在self._unpack_message_set(tp,messages)的_message_generator中为msg:文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py”,第 372 行,在 _unpack_message_set inner_mset = msg.decompress() 文件“/usr/local/lib/python2.7 /dist-packages/kafka/protocol/message.py",第 121 行,在解压缩中断言 has_snappy(),'Snappy decompression unsupported' AssertionError: Snappy decompression unsupported

这是我一直试图运行的代码:

from kafka import KafkaConsumer
consumer = KafkaConsumer ('mytopic',bootstrap_servers = ['localhost:9092'], group_id='test-consumer-group')
print "Consuming messages from the given topic"
for message in consumer:
    print("%s:%d%d: key=%s value=%s"  % (message.topic, message.partition, message.offset, message.key, message.value))

因为,我对 Kafka 真的很陌生,所以我很难理解我做错了什么。

4

1 回答 1

4

您似乎缺少 python-snappy,它是读取以 snappy 格式压缩的数据所必需的。

您需要snappyand snappy-devel,您可以使用 yum、apt-get 等安装它。然后尝试pip install python-snappy

于 2016-09-24T02:37:19.287 回答