我们的消费者组正在处理 100 多个主题(所有主题只有一个分区,所有 100 个主题的分区 0)
例如,在加载程序处理程序中批量处理。它根据消息计数和时间进行批处理和处理。循环继续进入股票行情,并且由于没有插入,批次大小 = 0,因此没有任何内容得到处理。我们陷入了这个循环。
func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
claimMsgChan := claim.Messages()
for {
select {
case message, ok := <-claimMsgChan:
// insert message in batch and process if batch > 0
case <-maxWaitTicker.C:
// process if batch > 0
}
}
}
在 100 多个主题中,只有 57 个主题收到了消息added subscription to
,其余 43 个主题从未被订阅,因此它们陷入了无限循环,等待消息进入阅读频道。
请建议这是否是预期的行为,是否由于某些并发限制? https://github.com/Shopify/sarama/issues/1897