设置:
- 120 个 python confluent-kafka 消费者都在订阅同一组主题
- 8 个不同分区数的主题:1 个主题有 84 个分区,几个主题有 40-50 个分区,其余的有 1-10 个分区。分区总数约为 300 个。
我使用非常标准的订阅代码:
def __init__(self, kafka_broker_list: str, group_id: str, topics: List[str]):
from confluent_kafka import Consumer
self._consumer = Consumer({
'bootstrap.servers': kafka_broker_list,
'fetch.max.bytes': 50 * 1024 * 1024, # 50MB
'auto.offset.reset': 'earliest',
'group.id': group_id,
'enable.auto.commit': True
})
logging.info(f"Subscribing for topics: {topics}")
self._consumer.subscribe(topics, on_assign=self._on_assign, on_revoke=self._on_revoke)
问题: 在我开始的 120 个消费者中,只有 84 个(与最大主题的分区数相同)获得分区分配 - 其他人没有任何分区分配,因此保持空闲状态。更糟糕的是,我通常会得到 5 个消费者,分配了 ~ 10 个分区,有些是 8 个,很多是 2-3-4,还有很多消费者只分配了一个分区。我相信订阅的“第一批”消费者会获得最多的主题,直到每个主题的可用分区都用完为止。
问题:
- 我阅读了
partition.assignment.strategy
Java 消费者可用的配置属性,但是我在 Confluent Kafka 客户端中找不到它。那么有没有办法在 Confluent Kafka Python Client 中配置分配策略? - 有没有办法在服务器上设置分区分配策略,或者每个主题或每个组 ID?
- 或者是否有不同的方式在所有消费者之间分配负载?
感谢您花时间阅读我的问题:)