1

我一直在使用 alpakka kafka 从 kafka 主题流式传输数据。我在用着:

Consumer
      .committableSource(consumerSettings, Subscriptions.topics(topic))

最近,我尝试在一个有 15 个分区的主题上向更多的消费者(如 3)发送垃圾邮件。当我插入更多具有相同组 id 的消费者时,它友好地为每个消费者拆分 5 个分区,但它似乎不会同时消耗所有分区,似乎是一个接一个地读取,或者读取特定分区比其他分区快得多.

|Partition|LogSize  |Consumer Offset|Lag      |
|0        |8,429,145|      6,087,144|2,342,001|
|1        |8,424,948|      6,223,257|2,201,691|
|2        |8,428,121|      7,764,854|  663,267|
|3        |8,421,528|      6,071,425|2,350,103|
|4        |8,434,659|      7,351,552|1,083,107|
|5        |8,428,323|      5,935,336|2,492,987|
|6        |8,424,974|      6,455,301|1,969,673|
|7        |8,431,820|      7,763,984|  667,836|
|8        |8,425,999|      6,370,962|2,055,037|
|9        |8,416,354|      6,681,093|1,735,261|
|10       |8,416,217|      6,814,949|1,601,268|
|11       |8,428,026|      5,878,703|2,549,323|
|12       |8,424,604|      8,424,589|       15|
|13       |8,431,019|      8,431,019|        0|
|14       |8,423,218|      8,423,218|        0|

这是我正在运行的生产应用程序的真实示例。所以我有一些问题:

可以比其他分区快得多地读取某些分区吗?

请注意,这种行为仅在我启动多个消费者时才会发生。

我应该改变我的消费方式吗?我应该使用每个分区的源,还是有不同的选择?

更新

我怀疑插入多个消费者(读取多个应用程序)时可能会发生这种情况,但是今天只使用一个消费者就发生了,您可以通过查看消费者组来看到,这是相同的。

在此处输入图像描述

在它发生的时候,我还有 20MM 的消息等待处理(滞后)。上图是我们公司的Kafka经理拍的。

4

1 回答 1

0

我们通过删除将消息从一个主题复制到另一个主题的组件之一来解决此问题。

本质上,生产者正在写入一个主题,该组件将这些消息复制到另一个主题,启用压缩,保持给定 id 的最后状态。事实证明,这个组件没有正常工作,并且附加到这个压缩主题的消费者遇到了一些问题。

所以,最后,谁需要一个压缩主题,让生产者直接写给它。

于 2019-07-17T09:30:59.230 回答