1

我正在尝试使用 Kafka 的 RoundRobinPartitioner 类在所有分区之间均匀分布消息。我的Kafka主题配置如下:

名称:multischemakafkatopicodd

分区数:16

复制因子:2

比如说,如果我产生 100 条消息,那么每个分区应该有 6 或 7 条消息。但是,我得到了类似的东西:

sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:26
multischemakafkatopicodd:5:0
multischemakafkatopicodd:10:24
multischemakafkatopicodd:15:0
multischemakafkatopicodd:13:0
multischemakafkatopicodd:8:26
multischemakafkatopicodd:2:26
multischemakafkatopicodd:12:24
multischemakafkatopicodd:14:24
multischemakafkatopicodd:9:0
multischemakafkatopicodd:11:0
multischemakafkatopicodd:4:26
multischemakafkatopicodd:1:0
multischemakafkatopicodd:6:24
multischemakafkatopicodd:7:0
multischemakafkatopicodd:3:0

我认为可能是我没有产生足够的消息,所以我尝试了 1M 记录并将分区数设置为奇数:

主题:multischemakafkatopicodd

分区数:31

复制因子:2

...我得到了这个。这一次,每个分区中的消息数量有点均匀分布。

sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:33845
multischemakafkatopicodd:5:34388
multischemakafkatopicodd:10:33837
multischemakafkatopicodd:20:33819
multischemakafkatopicodd:15:33890
multischemakafkatopicodd:25:34414
multischemakafkatopicodd:30:33862
multischemakafkatopicodd:26:34066
multischemakafkatopicodd:9:34088
multischemakafkatopicodd:11:34124
multischemakafkatopicodd:16:33802
multischemakafkatopicodd:4:34061
multischemakafkatopicodd:17:34977
multischemakafkatopicodd:3:34084
multischemakafkatopicodd:24:33849
multischemakafkatopicodd:23:34111
multischemakafkatopicodd:13:34062
multischemakafkatopicodd:28:33876
multischemakafkatopicodd:18:34098
multischemakafkatopicodd:22:34058
multischemakafkatopicodd:8:34079
multischemakafkatopicodd:2:33839
multischemakafkatopicodd:12:34075
multischemakafkatopicodd:29:34132
multischemakafkatopicodd:19:33924
multischemakafkatopicodd:14:34109
multischemakafkatopicodd:1:34088
multischemakafkatopicodd:6:33832
multischemakafkatopicodd:7:34080
multischemakafkatopicodd:27:34188
multischemakafkatopicodd:21:34684

我再次进行了相同的测试,但将分区数量减少到 8 个,我得到了这个结果,我们可以清楚地看到一些分区有接近 15K 的消息,而其他分区有大约 10K:

multischemakafkatopicodd:0:155927
multischemakafkatopicodd:5:105351
multischemakafkatopicodd:1:107382
multischemakafkatopicodd:4:160533
multischemakafkatopicodd:6:158007
multischemakafkatopicodd:7:105608
multischemakafkatopicodd:2:157934
multischemakafkatopicodd:3:105599

我做错了什么还是它应该如何工作?为什么消息的分布如此不均?

如果有人可以帮助我,那就太好了。谢谢。

4

1 回答 1

4

据我了解,分区程序运行良好。但是您必须了解生产者为了最大化性能所做的优化:

  • 生产者不会为每个 send call 将每条消息都生成到不同的分区,因为它会是矫枉过正。

  • Round-Robin保证类似的分布,但可以批量发送。这意味着,它将根据's 代码中的remainder不是模数!)操作缓冲一定数量的发往 partition 的消息:RoundRobinPartitioner

     int part = Utils.toPositive(nextValue) % availablePartitions.size();
    

nextValue是一个AtomicInteger为每个分区/发送调用递增 1。因此,余数也将始终加一(以循环方式,例如使用 4 个分区0-1-2-3-0-1-2-3-...:),假设在此过程中没有分区被声明为不可用。如果发生这种情况,循环可能看起来像0-1-2-(partition4fails)-0-1-2-(partition4OK)-3-0-...


例子

  • 4 个分区的主题
  • 每个分区生产者分区线程 缓冲区保存3 条消息

(消息编号计数器以 0 - 开头new AtomicInteger(0)

    MsgN % Partitions   Partition
        0%4                0
        1%4                1
        2%4                2
        3%4                3
        4%4                0
        5%4                1
        6%4                2 
        7%4                3
        8%4                0
        ...               ...

当产生第 9 条消息时,第一个分区的缓冲区已完成(因为它已经保存了 3 条消息),因此可以发送到 kafka。如果您在此处停止该过程,则 4 个分区将如下所示:

    Partition    Offset
       0           3
       1           0
       2           0
       3           0

当产生第10 条消息时,第二个分区的缓冲区也将准备好从线路中发送出去,主题如下所示:

    Partition    Offset
       0           3
       1           3
       2           0
       3           0

在现实生活中,缓冲区通常保存大量消息(也可以调整)。例如,假设存储了 1000 条消息。对于相同的场景,分区看起来像:

    Partition    Offset
       0           1000
       1           1000
       2           0
       3           0

因此增加了分区之间的“视觉”差异。更大的批处理大小/缓冲区大小将更加臭名昭著。

这与生产者线程本身的性质有关partitioner:默认情况下,它不会单独发送每条消息,而是存储它们以便在每次代理调用时发送多条消息,优化系统性能。

批处理是效率的主要驱动力之一,为了启用批处理,Kafka 生产者将尝试在内存中累积数据并在单个请求中发送更大的批处理

如果生产者停止/启动,这种不平衡可能会更加臭名昭著,因为无论先前选择的分区如何,它都会重新启动机制(因此它可以开始发送到在停止之前选择的同一个分区,从而增加与其他分区的差异上次执行的非选举分区)。

在新的执行中,缓冲区将全部为空,因此无论哪个分区接收最多,进程都会重新启动。

所以,你在这里停止这个过程:

    Partition    Offset
       0           1000
       1           1000
       2           0
       3           0

重新启动包含每个主题的消息数计数器的映射,因为它不是代理的一部分,而是来自生产者的 Partitioner类。如果生产者没有正确关闭和/或刷新,那些缓存的消息也会丢失。所以,在这种情况下,你得到的是对前面逻辑的重复:

    MsgN % Partitions   Partition
        0%4                0
        1%4                1
        2%4                2
        3%4                3
                 (...)

这将在特定时刻导致:

    Partition    Offset
       0          2000
       1          2000
       2           0
       3           0

这是由发送过程的非连续执行产生的不平衡,但它超出了 的范围RoundRobinPartitioner,其性质基于连续过程(不间断)。

You can verify this behaviour by checking each partition's offset while sending the messages: Only when the selected partition stores n messages, the next elected partition will get its batch of n messages.

注意:示例中显示的数字指的是“完美”场景;在现实生活中,消息也可以被撤销、压缩、失败、刷新,而不管缓冲区大小、分区不可用……导致偏移量,如您的问题中所示。

最后一个带有冲洗场景的示例:

    Partition    Offset
       0           1000
       1           1000
       2           0
       3           0

该过程已停止,但生产者已正确关闭并刷新其消息,因此主题如下所示:

    Partition    Offset
       0           1997
       1           1996
       2           999
       3           998

该过程重新启动。刷新第一个分区的缓冲区后,如下所示:

    Partition    Offset
       0           2997
       1           1996
       2           999
       3           998

因此增加了对机制“公平性”的混淆。但这不是它的错,因为分区器的映射、计数器和缓冲区中没有持久性。如果您让该过程连续几天执行,您会发现它确实以“近乎相等”的方式平衡了消息。


RoundRobinPartitioner相关方法

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, 
                     byte[] valueBytes, Cluster cluster) 
{
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions=cluster.availablePartitionsForTopic(topic);
    if (!availablePartitions.isEmpty()) { 
        /*remainder calculus in order to select next partition*/
        int part = Utils.toPositive(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // no partitions are available, give a non-available partition
        return Utils.toPositive(nextValue) % numPartitions;
    }
}

private int nextValue(String topic) 
{
  /*Counter of num messages sent. topicCounterMap is part of the producer 
   process, hence not persisted by default.
   It will start by 0 for every topic with each new launch*/
   AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
       return new AtomicInteger(0); });
   return counter.getAndIncrement();
}
于 2020-12-18T21:14:33.180 回答