0

我们的消费者组正在处理 100 多个主题(所有主题只有一个分区,所有 100 个主题的分区 0)

例如,在加载程序处理程序中批量处理。它根据消息计数和时间进行批处理和处理。循环继续进入股票行情,并且由于没有插入,批次大小 = 0,因此没有任何内容得到处理。我们陷入了这个循环。

func (h *handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  claimMsgChan := claim.Messages()
  for {
      select {
          case message, ok := <-claimMsgChan:
                     // insert message in batch and process if batch > 0 
          case <-maxWaitTicker.C:
                    // process if batch > 0 
      }
  }
}

在 100 多个主题中,只有 57 个主题收到了消息added subscription to,其余 43 个主题从未被订阅,因此它们陷入了无限循环,等待消息进入阅读频道。

请建议这是否是预期的行为,是否由于某些并发限制? https://github.com/Shopify/sarama/issues/1897

4

0 回答 0