0

我正在尝试从 python 读取 Kafka,但收到的消息是 None ,CLI 中没有错误。我通过putty使用端口转发到目标主机,而不是通过telnet测试端口 - 它工作正常。此外,我在 Debian (WSL) 上使用 kafkacat,它工作得很好!

kafkacat -C -b localhost:9092 -t topic1 -p 0 -o beginning -s avro -r http://localhost:28081

我正在使用 PyCharm,我的代码在文本下方。我该如何调试?

from confluent_kafka.avro import AvroConsumer
from confluent_kafka import TopicPartition
from confluent_kafka.avro.serializer import SerializerError

topics = ['topic1', 'topic2']
c = AvroConsumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'smallest',
    'schema.registry.url': 'http://localhost:28081',
    'api.version.request': True
})

c.subscribe(topics)
tp = TopicPartition(topics[0], 0, 0)
c.assign([tp])

while True:
    try:
        msg = c.poll(1)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        print('Message None')
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

c.close()

作为

4

1 回答 1

1

我要做的第一件事是确保使用该kafka-avro-console-consumer工具有关于您的主题的消息。

然后在您的应用程序中,您可以尝试提高日志级别:

c = AvroConsumer({
    # ... your config here
    'log_level': 7,
    'debug': 'all',
})

你可以在这里看到不同的参数:https ://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

但我相信您的问题与您分配分区的方式有关。如果您使用subscribe分区,则集群会自动将分区分配给您的使用者。您可以在订阅时添加回调,您可以查看哪些分区已分配给您的消费者,但您不必自己做。请参阅https://docs.confluent.io/3.1.1/clients/confluent-kafka-python/index.html#confluent_kafka.Consumer.subscribe

于 2020-08-13T08:54:37.620 回答