3

我想要 1 个主题和 10 个分区。我正在使用 Kafka 的默认配置。我通过该帮助脚本创建了 1 个具有 10 个分区的主题,现在我将要向它生成消息。

问题是,似乎只有 5 个分区可供消费者从中获取数据。

让我们更详细地描述它。

我知道每个分区需要一个消费者线程的常见问题。我希望能够提交每个分区的偏移量,这只有在每个分区的每个消费者连接器有 1 个线程时才有可能(我使用的是高级消费者)。

所以我创建了 10 个线程,在每个线程中我调用 Consumer.createJavaConsumerConnector() 我正在执行此操作

topicCountMap.put("mytopic", 1);

最后我有 1 个迭代器,它消耗来自 1 个分区的消息。

当我这样做 10 次时,我有 10 个消费者,每个分区每个线程的消费者,我可以在每个分区独立提交偏移量,因为如果我在主题映射中放置与 1 不同的数字,我最终会得到超过 1 个该主题的消费者线程因此,如果我要使用创建的消费者实例提交偏移量,它将为所有不需要的线程提交它们,因此对于不需要的多个分区。

但问题是,当我使用消费者时,只涉及 5 个消费者,其他线程似乎处于空闲状态,但我不知道为什么。

第一个可能的原因是,即使我有 10 个分区,也只有 5 个分区有消息,所以其他 5 个消费者处于空闲状态,但我不明白当我使用生产者时,消息如何不能均匀地分布在所有分区中。我正在发送大约 100 万条消息,所以如果说它们是均匀分布的,那么每个分区必须至少包含一些消息。

// 编辑

我设法在一个主题中创建了 10 个分区,但我只有 7 个消费者。这对我来说简直是个奇迹。

问题是我正在循环中创建这些消费者线程。所以我启动第一个线程(提交给执行器服务),然后是另一个,然后是另一个,依此类推。

所以场景是第一个消费者获得所有 10 个分区,然后第二个连接,所以它在这两个之间拆分为 5 和 5(或类似的东西),然后其他线程正在连接。

我将其理解为所有消费者之间的分区重新平衡,因此它在某种意义上表现良好,如果创建更多消费者,则这些消费者之间会发生分区平衡,因此每个消费者都应该有一些分区可以操作。

但从结果中我看到只有 7 个消费者,根据消费的消息,它们似乎被分割为 3,2,1,1,1,1,1 分区方式。是的,这 7 个消费者覆盖了所有 10 个分区,但是为什么超过 1 个分区的消费者不进行拆分并将分区给剩下的 3 个消费者呢?

我非常想知道剩余的 3 个线程发生了什么,以及为什么它们不从分配了超过 1 个分区的消费者那里“抓取”分区。

4

1 回答 1

0

当我(意外地)在通过管理脚本创建主题之前以编程方式访问主题时,我看到了类似的行为。在这种情况下,分区数以及其他主题配置设置默认为 broker.config 中的值

于 2015-07-12T22:15:10.550 回答