我想要 1 个主题和 10 个分区。我正在使用 Kafka 的默认配置。我通过该帮助脚本创建了 1 个具有 10 个分区的主题,现在我将要向它生成消息。
问题是,似乎只有 5 个分区可供消费者从中获取数据。
让我们更详细地描述它。
我知道每个分区需要一个消费者线程的常见问题。我希望能够提交每个分区的偏移量,这只有在每个分区的每个消费者连接器有 1 个线程时才有可能(我使用的是高级消费者)。
所以我创建了 10 个线程,在每个线程中我调用 Consumer.createJavaConsumerConnector() 我正在执行此操作
topicCountMap.put("mytopic", 1);
最后我有 1 个迭代器,它消耗来自 1 个分区的消息。
当我这样做 10 次时,我有 10 个消费者,每个分区每个线程的消费者,我可以在每个分区独立提交偏移量,因为如果我在主题映射中放置与 1 不同的数字,我最终会得到超过 1 个该主题的消费者线程因此,如果我要使用创建的消费者实例提交偏移量,它将为所有不需要的线程提交它们,因此对于不需要的多个分区。
但问题是,当我使用消费者时,只涉及 5 个消费者,其他线程似乎处于空闲状态,但我不知道为什么。
第一个可能的原因是,即使我有 10 个分区,也只有 5 个分区有消息,所以其他 5 个消费者处于空闲状态,但我不明白当我使用生产者时,消息如何不能均匀地分布在所有分区中。我正在发送大约 100 万条消息,所以如果说它们是均匀分布的,那么每个分区都必须至少包含一些消息。
// 编辑
我设法在一个主题中创建了 10 个分区,但我只有 7 个消费者。这对我来说简直是个奇迹。
问题是我正在循环中创建这些消费者线程。所以我启动第一个线程(提交给执行器服务),然后是另一个,然后是另一个,依此类推。
所以场景是第一个消费者获得所有 10 个分区,然后第二个连接,所以它在这两个之间拆分为 5 和 5(或类似的东西),然后其他线程正在连接。
我将其理解为所有消费者之间的分区重新平衡,因此它在某种意义上表现良好,如果创建更多消费者,则这些消费者之间会发生分区平衡,因此每个消费者都应该有一些分区可以操作。
但从结果中我看到只有 7 个消费者,根据消费的消息,它们似乎被分割为 3,2,1,1,1,1,1 分区方式。是的,这 7 个消费者覆盖了所有 10 个分区,但是为什么超过 1 个分区的消费者不进行拆分并将分区给剩下的 3 个消费者呢?
我非常想知道剩余的 3 个线程发生了什么,以及为什么它们不从分配了超过 1 个分区的消费者那里“抓取”分区。