1

我正在尝试通过在 0 0 * * * 每天使用 confluent_kafka Python 包来进行批处理 etl。我知道我的流中有 4 个分区,但它可以更改,所以有没有办法检查特定主题中的分区总数?我的消费者就是这样;

from confluent_kafka import Consumer, KafkaError

    messages = list()
    partition_counter = 0
    tnof_partition = 4

    while True:
        msg = self.consumer.poll(0.1)
        if msg is None:
            continue
        elif not msg.error():
            event = json.loads(msg.value().decode('utf-8'))

        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print("End of partition reached {0}/{1}"
                .format(msg.topic(), msg.partition()))
            
            partition_counter += 1
            if(partition_counter == tnof_partition):
                self.consumer.commit()
                self.consumer.close()
                break

如果您能展示实现批量消费者的替代方法,我也将不胜感激。谢谢

4

1 回答 1

1

消费者的list_topics()方法可以提供最终包含在其中的Topics组成图。TopicMetadatapartitions

参考:https ://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.list_topics

于 2021-08-09T08:58:40.390 回答