2

I have a Kafka topic that is receiving binary data (raw packet capture data). I can confirm that it is indeed landing data using the Kafka CLI tools. I receive multiple messages each second.

kafka-console-consumer.sh --zookeeper svr:2181 --topic test

But when I use kafka-python, I cannot retrieve any messages. The poll method simply returns no results.

(Pdb) consumer = kafka.KafkaConsumer("test", bootstrap_servers=["svr:9092"])
(Pdb) consumer.poll(5000)
{}

I have been able to use kafka-python to pull messages from a separate topic that contains just text strings.

I am curious if somehow internally kafka-python is dropping the messages because they are binary and failing some sort of validation. How can I dig deeper and see why no messages can be retrieved?

4

1 回答 1

1

问题是发送到主题的数据使用的是 snappy 压缩。我所要做的就是安装一个额外的模块来处理 snappy。

pip install python-snappy

不幸的是,使用我在问题中概述的代码,它只是不返回任何数据,而不是告诉我问题与压缩有关。

为了比较,我使用了旧的消费者 API,它确实正确地报告了问题并引导我找到了这个解决方案。

>>> client = kafka.SimpleClient("svr:9092")
>>> consumer.close()
>>> consumer = kafka.SimpleConsumer(client, "group", "test")
>>> for message in consumer:
...     print(message)
...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 353, in __iter__
    message = self.get_message(True, timeout)
  File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 305, in get_message
    return self._get_message(block, timeout, get_partition_info)
  File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 320, in _get_message
    self._fetch()
  File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 379, in _fetch
    fail_on_error=False
  File "/usr/lib/python2.7/site-packages/kafka/client.py", line 665, in send_fetch_request
    KafkaProtocol.decode_fetch_response)
  File "/usr/lib/python2.7/site-packages/kafka/client.py", line 295, in _send_broker_aware_request
    for payload_response in decoder_fn(future.value):
  File "/usr/lib/python2.7/site-packages/kafka/protocol/legacy.py", line 212, in decode_fetch_response
    for partition, error, highwater_offset, messages in partitions
  File "/usr/lib/python2.7/site-packages/kafka/protocol/legacy.py", line 219, in decode_message_set
    inner_messages = message.decompress()
  File "/usr/lib/python2.7/site-packages/kafka/protocol/message.py", line 121, in decompress
    assert has_snappy(), 'Snappy decompression unsupported'
AssertionError: Snappy decompression unsupported
于 2017-03-15T14:39:18.713 回答