我正在尝试从 python 读取 Kafka,但收到的消息是 None ,CLI 中没有错误。我通过putty使用端口转发到目标主机,而不是通过telnet测试端口 - 它工作正常。此外,我在 Debian (WSL) 上使用 kafkacat,它工作得很好!
kafkacat -C -b localhost:9092 -t topic1 -p 0 -o beginning -s avro -r http://localhost:28081
我正在使用 PyCharm,我的代码在文本下方。我该如何调试?
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import TopicPartition
from confluent_kafka.avro.serializer import SerializerError
topics = ['topic1', 'topic2']
c = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'smallest',
'schema.registry.url': 'http://localhost:28081',
'api.version.request': True
})
c.subscribe(topics)
tp = TopicPartition(topics[0], 0, 0)
c.assign([tp])
while True:
try:
msg = c.poll(1)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
print('Message None')
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
c.close()
作为