0

设置:

  • 120 个 python confluent-kafka 消费者都在订阅同一组主题
  • 8 个不同分区数的主题:1 个主题有 84 个分区,几个主题有 40-50 个分区,其余的有 1-10 个分区。分区总数约为 300 个。

我使用非常标准的订阅代码:

def __init__(self, kafka_broker_list: str, group_id: str, topics: List[str]):
        from confluent_kafka import Consumer
        self._consumer = Consumer({
            'bootstrap.servers': kafka_broker_list,
            'fetch.max.bytes': 50 * 1024 * 1024,  # 50MB
            'auto.offset.reset': 'earliest',
            'group.id': group_id,
            'enable.auto.commit': True
        })
        logging.info(f"Subscribing for topics: {topics}")
        self._consumer.subscribe(topics, on_assign=self._on_assign, on_revoke=self._on_revoke)

问题: 在我开始的 120 个消费者中,只有 84 个(与最大主题的分区数相同)获得分区分配 - 其他人没有任何分区分配,因此保持空闲状态。更糟糕的是,我通常会得到 5 个消费者,分配了 ~ 10 个分区,有些是 8 个,很多是 2-3-4,还有很多消费者只分配了一个分区。我相信订阅的“第一批”消费者会获得最多的主题,直到每个主题的可用分区都用完为止。

问题:

  1. 我阅读了partition.assignment.strategyJava 消费者可用的配置属性,但是我在 Confluent Kafka 客户端中找不到它。那么有没有办法在 Confluent Kafka Python Client 中配置分配策略?
  2. 有没有办法在服务器上设置分区分配策略,或者每个主题或每个组 ID?
  3. 或者是否有不同的方式在所有消费者之间分配负载?

感谢您花时间阅读我的问题:)

4

1 回答 1

0

confluent-kafka python 客户端内部使用 librdkafka 库,它实际上允许配置分配策略。目前支持两种分配策略 - “范围” - 默认一个,以及解决我描述的问题的“循环”。

它是通过将以下配置属性添加到使用者配置来配置的:

'partition.assignment.strategy': 'roundrobin',

此处提供了所有 librdkafka 属性的文档: https ://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

于 2020-07-21T14:28:48.893 回答