0

我有一个消费者组从一个有十个分区的主题中读取:

[root@kafka01 kafka]# ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ssIncomingGroup --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                        HOST            CLIENT-ID
ssIncomingGroup ssIncoming      3          13688           13987           299             ssTS@influx01 (github.com/segmentio/kafka-go)-f1c5b4c7-9cf0-4132-902a-db9d0429d520 /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      7          13484           13868           384             ssTS@influx01 (github.com/segmentio/kafka-go)-f1c5b4c7-9cf0-4132-902a-db9d0429d520 /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      2          13322           13698           376             ssTS@influx01 (github.com/segmentio/kafka-go)-20ee82a9-825d-4d9a-9f20-f4610c21f171 /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      8          13612           13899           287             ssTS@influx01 (github.com/segmentio/kafka-go)-20ee82a9-825d-4d9a-9f20-f4610c21f171 /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      1          13568           13932           364             ssTS@influx01 (github.com/segmentio/kafka-go)-df68ca85-d722-47ef-82c2-2fd60e186fac /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      6          13651           13950           299             ssTS@influx01 (github.com/segmentio/kafka-go)-df68ca85-d722-47ef-82c2-2fd60e186fac /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      0          13609           13896           287             ssTS@influx01 (github.com/segmentio/kafka-go)-10b7f10f-9535-4338-9851-f583a9a7c935 /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      5          13646           13945           299             ssTS@influx01 (github.com/segmentio/kafka-go)-10b7f10f-9535-4338-9851-f583a9a7c935 /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      4          13543           13843           300             ssTS@influx01 (github.com/segmentio/kafka-go)-3c847add-172f-4007-adf2-ce486686dd7c /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      9          13652           13951           299             ssTS@influx01 (github.com/segmentio/kafka-go)-3c847add-172f-4007-adf2-ce486686dd7c /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)

我正在为 Go 使用 Segment.io Kaka 库:"github.com/segmentio/kafka-go".

我的卡夫卡作家看起来像这样:

kafkaWriter := kafka.NewWriter(kafka.WriterConfig{
    Async:         false,
    Brokers:       config.KafkaHosts,  // a string slice of 4 Kafka hosts
    QueueCapacity: kafkaQueueCapacity,
    Topic:         kafkaTopic,
    Balancer: &kafka.LeastBytes{},  // Same result with the default round-robin balancer
})

我的 Kafka 阅读器如下所示:

    kafkaReader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: config.KafkaHosts,  // same as above
        GroupID: config.KafkaGroup,
        Topic:   config.KafkaTopic,  // same as above
    })

该主题最初是这样创建的:

conn.CreateTopics(kafka.TopicConfig{
    NumPartitions:     config.KafkaPartitions,  // == 10
    ReplicationFactor: config.KafkaReplication,  // == 1
    Topic:             kafkaTopic,  // same as above
})

当我运行我的程序并观察主机和网络负载时,我发现几乎所有负载/网络活动都在四个 Kafka 代理之一上。当我du记录 Kafka 主机的日志目录时,同一主机在 FS 上的 Kafka 数据比其他主机多得多(例如,150M 而不是 15M)。

我想要并期望发生的是在所有四个 Kafka 服务器之间分配负载,这样一个就不会成为瓶颈(来自 CPU 或网络)。为什么没有发生这种情况?

编辑(添加请求的命令输出):

[root@kafka01 kafka]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092                                                                                                                                    
Topic: ssIncoming       PartitionCount: 10      ReplicationFactor: 1    Configs: flush.ms=1000,segment.bytes=536870912,flush.messages=10000,retention.bytes=1073741824                                                    
        Topic: ssIncoming       Partition: 0    Leader: 4       Replicas: 4     Isr: 4                                                                                                                                    
        Topic: ssIncoming       Partition: 1    Leader: 2       Replicas: 2     Isr: 2                                                                                                                                    
        Topic: ssIncoming       Partition: 2    Leader: 3       Replicas: 3     Isr: 3                       
        Topic: ssIncoming       Partition: 3    Leader: 1       Replicas: 1     Isr: 1
        Topic: ssIncoming       Partition: 4    Leader: 4       Replicas: 4     Isr: 4                       
        Topic: ssIncoming       Partition: 5    Leader: 2       Replicas: 2     Isr: 2                      
        Topic: ssIncoming       Partition: 6    Leader: 3       Replicas: 3     Isr: 3                       
        Topic: ssIncoming       Partition: 7    Leader: 1       Replicas: 1     Isr: 1                       
        Topic: ssIncoming       Partition: 8    Leader: 4       Replicas: 4     Isr: 4                       
        Topic: ssIncoming       Partition: 9    Leader: 2       Replicas: 2     Isr: 2                       
Topic: __consumer_offsets       PartitionCount: 50      ReplicationFactor: 1    Configs: compression.type=producer,cleanup.policy=compact,flush.ms=1000,segment.bytes=104857600,flush.messages=10000,retention.bytes=1073$41824     
        Topic: __consumer_offsets       Partition: 0    Leader: 4       Replicas: 4     Isr: 4               
        Topic: __consumer_offsets       Partition: 1    Leader: 1       Replicas: 1     Isr: 1                                                                                                                            
        Topic: __consumer_offsets       Partition: 2    Leader: 2       Replicas: 2     Isr: 2               
        Topic: __consumer_offsets       Partition: 3    Leader: 3       Replicas: 3     Isr: 3               
        Topic: __consumer_offsets       Partition: 4    Leader: 4       Replicas: 4     Isr: 4               
        Topic: __consumer_offsets       Partition: 5    Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 6    Leader: 2       Replicas: 2     Isr: 2          
        Topic: __consumer_offsets       Partition: 7    Leader: 3       Replicas: 3     Isr: 3      
        Topic: __consumer_offsets       Partition: 8    Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 9    Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 10   Leader: 2       Replicas: 2     Isr: 2        
        Topic: __consumer_offsets       Partition: 11   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 12   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 13   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 14   Leader: 2       Replicas: 2     Isr: 2        
        Topic: __consumer_offsets       Partition: 15   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 16   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 17   Leader: 1       Replicas: 1     Isr: 1      
        Topic: __consumer_offsets       Partition: 18   Leader: 2       Replicas: 2     Isr: 2               
        Topic: __consumer_offsets       Partition: 19   Leader: 3       Replicas: 3     Isr: 3                                                                                                                            
        Topic: __consumer_offsets       Partition: 20   Leader: 4       Replicas: 4     Isr: 4                                                                                                                            
        Topic: __consumer_offsets       Partition: 21   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 22   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 23   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 24   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 25   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 26   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 27   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 28   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 29   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 30   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 31   Leader: 3       Replicas: 3     Isr: 3                                                                                                                            
        Topic: __consumer_offsets       Partition: 32   Leader: 4       Replicas: 4     Isr: 4               
        Topic: __consumer_offsets       Partition: 33   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 34   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 35   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 36   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 37   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 38   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 39   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 40   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 41   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 42   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 43   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 44   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 45   Leader: 1       Replicas: 1     Isr: 1
    Topic: __consumer_offsets       Partition: 46   Leader: 2       Replicas: 2     Isr: 2
    Topic: __consumer_offsets       Partition: 47   Leader: 3       Replicas: 3     Isr: 3
    Topic: __consumer_offsets       Partition: 48   Leader: 4       Replicas: 4     Isr: 4
    Topic: __consumer_offsets       Partition: 49   Leader: 1       Replicas: 1     Isr: 1

(编辑 2):这是我在生成 Kafka 配置文件时使用的变量。它们对于 4 个经纪人中的每一个都是相同的。

scala_version: 2.12
kafka_config_broker_id: 0
kafka_config_log_dirs: "/tmp/kafka_logs"
kafka_config_log_flush_interval_messages: 10000
kafka_config_log_flush_interval_ms: 1000
kafka_config_log_retention_bytes: 1073741824
kafka_config_log_retention_check_interval: 60000
kafka_config_log_retention_hours: 168
kafka_config_log_segment_bytes: 536870912
kafka_config_num_io_threads: 4
kafka_config_num_network_threads: 2
kafka_config_num_partitions: 2
kafka_config_offsets_topic_replication_factor: 1
kafka_config_receive_buffer_bytes: 1048576
kafka_config_request_max_bytes: 104857600
kafka_config_send_buffer_bytes: 1048576
kafka_config_zookeeper_connection_timeout_ms: 1000000
kafka_config_zookeeper_servers:
    - consul01
    - consul02
    - consul03
kafka_exporter_version: 1.2.0
kafka_port: 9092
kafka_version: 2.4.0

此数据用于 Ansible 模板。生成的 kafka conf 如下所示:

broker.id=1
port=9092
num.network.threads=2
num.io.threads=4
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka_logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=536870912
log.retention.check.interval.ms=60000

# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

offsets.topic.replication.factor=1

zookeeper.connect=consul01:2181,consul02:2181,consul03:2181
zookeeper.connection.timeout.ms=1000000

delete.topic.enable=true

请注意,这是用于开发的,并且这些内容经常被重新旋转(每天几次)。每次重新旋转后问题仍然存在。

4

1 回答 1

0

现在看来负载平衡得很好:

  • 分区领导者以可能的最佳平衡方式分布在代理之间
    • Broker 1 是分区 3,7 的领导者
    • Broker 2 是分区 1,5,9 的领导者
    • Broker 3 是分区 2,6 的领导者
    • Broker 4 是分区 0,4,8 的领导者
  • 分区也平均分配给消费者(每个消费者 2 个分区)
  • 分区中的偏移量几乎相同(因此您似乎正在均匀地向分区生成消息)

当我查看 Kafka 主机的日志目录时,同一主机在 FS 上的 Kafka 数据比其他主机多得多(例如,150M 而不是 15M)。

分区中的日志偏移量几乎相同。但是当然,broker 2 和 4 必须有更多的数据,因为如您所见,它们要处理更多的分区。网络流量也必须更多,因为它们处理 3 个分区。(来自消费者的轮询请求,也发送来自生产者的请求)

但是在一个经纪人中仍然有 10 倍多的数据是不明智的。恕我直言,在某些时候一个或多个代理不健康(无法向 Zookeeper 发送心跳或关闭)并且控制器将分区分配给健康的代理,并且有一段时间一些代理正在处理更多的分区。(顺便说一句,这个场景auto.leader.rebalance.enable必须是)true

注意:我假设您的代理配置(尤其是关于代理的配置对log.retention存储在代理中的数据具有重要作用)和代理的系统资源是相同的。如果不是,您应该指定它。


顺便说一句,如果您对当前将分区分配给代理不满意。您可以使用kafka-reassign-partitions.sh工具手动更改它。您只需要创建一个指定分区副本的 json 文件。

例如:

{"version":1,
  "partitions":[
     {"topic":"ssIncoming","partition":0,"replicas":[1]},
     {"topic":"ssIncoming","partition":1,"replicas":[1]},
     {"topic":"ssIncoming","partition":2,"replicas":[1]},
     {"topic":"ssIncoming","partition":3,"replicas":[2]},
     {"topic":"ssIncoming","partition":4,"replicas":[2]},
     {"topic":"ssIncoming","partition":5,"replicas":[3]},
     {"topic":"ssIncoming","partition":6,"replicas":[3]},
     {"topic":"ssIncoming","partition":7,"replicas":[3]},
     {"topic":"ssIncoming","partition":8,"replicas":[4]},
     {"topic":"ssIncoming","partition":9,"replicas":[4]}
]}

然后你只需要运行这个命令:

./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file change-replicas.json --execute
于 2020-02-16T06:48:56.840 回答