0

由于pykafka EOL,我们正在迁移到confluent-kafka-python。因为pykafka我们编写了一个详细的脚本,它以以下格式生成输出:

话题 消费群体 抵消
topic_alpha total_messages 100
topic_alpha 消费者_a 10
topic_alpha 消费者_b 25

我想知道是否有一个 Python 代码知道如何为confluent-kafka-python?

小字:有一个关于如何读取每个给定 consumer_group 的偏移量的部分示例。但是,我很难在consumer_group不手动解析的情况下获取每个主题的列表__consumer_offsets

4

1 回答 1

1

用于admin_client.list_groups()获取组列表,并admin_client.list_topics()获取集群中和client.get_watermark_offsets()给定主题的所有主题和分区。

然后为每个消费者组实例化一个具有相应的新消费者group.id,创建一个 TopicPartition 列表来查询已提交的偏移量,然后调用c.committed()以检索已提交的偏移量。从高水位线中减去承诺的偏移量得到

于 2021-03-09T08:48:58.807 回答