我有一个消费者组从一个有十个分区的主题中读取:
[root@kafka01 kafka]# ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ssIncomingGroup --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ssIncomingGroup ssIncoming 3 13688 13987 299 ssTS@influx01 (github.com/segmentio/kafka-go)-f1c5b4c7-9cf0-4132-902a-db9d0429d520 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 7 13484 13868 384 ssTS@influx01 (github.com/segmentio/kafka-go)-f1c5b4c7-9cf0-4132-902a-db9d0429d520 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 2 13322 13698 376 ssTS@influx01 (github.com/segmentio/kafka-go)-20ee82a9-825d-4d9a-9f20-f4610c21f171 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 8 13612 13899 287 ssTS@influx01 (github.com/segmentio/kafka-go)-20ee82a9-825d-4d9a-9f20-f4610c21f171 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 1 13568 13932 364 ssTS@influx01 (github.com/segmentio/kafka-go)-df68ca85-d722-47ef-82c2-2fd60e186fac /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 6 13651 13950 299 ssTS@influx01 (github.com/segmentio/kafka-go)-df68ca85-d722-47ef-82c2-2fd60e186fac /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 0 13609 13896 287 ssTS@influx01 (github.com/segmentio/kafka-go)-10b7f10f-9535-4338-9851-f583a9a7c935 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 5 13646 13945 299 ssTS@influx01 (github.com/segmentio/kafka-go)-10b7f10f-9535-4338-9851-f583a9a7c935 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 4 13543 13843 300 ssTS@influx01 (github.com/segmentio/kafka-go)-3c847add-172f-4007-adf2-ce486686dd7c /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 9 13652 13951 299 ssTS@influx01 (github.com/segmentio/kafka-go)-3c847add-172f-4007-adf2-ce486686dd7c /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
我正在为 Go 使用 Segment.io Kaka 库:"github.com/segmentio/kafka-go"
.
我的卡夫卡作家看起来像这样:
kafkaWriter := kafka.NewWriter(kafka.WriterConfig{
Async: false,
Brokers: config.KafkaHosts, // a string slice of 4 Kafka hosts
QueueCapacity: kafkaQueueCapacity,
Topic: kafkaTopic,
Balancer: &kafka.LeastBytes{}, // Same result with the default round-robin balancer
})
我的 Kafka 阅读器如下所示:
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: config.KafkaHosts, // same as above
GroupID: config.KafkaGroup,
Topic: config.KafkaTopic, // same as above
})
该主题最初是这样创建的:
conn.CreateTopics(kafka.TopicConfig{
NumPartitions: config.KafkaPartitions, // == 10
ReplicationFactor: config.KafkaReplication, // == 1
Topic: kafkaTopic, // same as above
})
当我运行我的程序并观察主机和网络负载时,我发现几乎所有负载/网络活动都在四个 Kafka 代理之一上。当我du
记录 Kafka 主机的日志目录时,同一主机在 FS 上的 Kafka 数据比其他主机多得多(例如,150M 而不是 15M)。
我想要并期望发生的是在所有四个 Kafka 服务器之间分配负载,这样一个就不会成为瓶颈(来自 CPU 或网络)。为什么没有发生这种情况?
编辑(添加请求的命令输出):
[root@kafka01 kafka]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092
Topic: ssIncoming PartitionCount: 10 ReplicationFactor: 1 Configs: flush.ms=1000,segment.bytes=536870912,flush.messages=10000,retention.bytes=1073741824
Topic: ssIncoming Partition: 0 Leader: 4 Replicas: 4 Isr: 4
Topic: ssIncoming Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: ssIncoming Partition: 2 Leader: 3 Replicas: 3 Isr: 3
Topic: ssIncoming Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: ssIncoming Partition: 4 Leader: 4 Replicas: 4 Isr: 4
Topic: ssIncoming Partition: 5 Leader: 2 Replicas: 2 Isr: 2
Topic: ssIncoming Partition: 6 Leader: 3 Replicas: 3 Isr: 3
Topic: ssIncoming Partition: 7 Leader: 1 Replicas: 1 Isr: 1
Topic: ssIncoming Partition: 8 Leader: 4 Replicas: 4 Isr: 4
Topic: ssIncoming Partition: 9 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets PartitionCount: 50 ReplicationFactor: 1 Configs: compression.type=producer,cleanup.policy=compact,flush.ms=1000,segment.bytes=104857600,flush.messages=10000,retention.bytes=1073$41824
Topic: __consumer_offsets Partition: 0 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 3 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 4 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 5 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 6 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 7 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 8 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 9 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 10 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 11 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 12 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 13 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 14 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 15 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 16 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 17 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 18 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 19 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 20 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 21 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 22 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 23 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 24 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 25 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 26 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 27 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 28 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 29 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 30 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 31 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 32 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 33 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 34 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 35 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 36 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 37 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 38 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 39 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 40 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 41 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 42 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 43 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 44 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 45 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 46 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 47 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 48 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 49 Leader: 1 Replicas: 1 Isr: 1
(编辑 2):这是我在生成 Kafka 配置文件时使用的变量。它们对于 4 个经纪人中的每一个都是相同的。
scala_version: 2.12
kafka_config_broker_id: 0
kafka_config_log_dirs: "/tmp/kafka_logs"
kafka_config_log_flush_interval_messages: 10000
kafka_config_log_flush_interval_ms: 1000
kafka_config_log_retention_bytes: 1073741824
kafka_config_log_retention_check_interval: 60000
kafka_config_log_retention_hours: 168
kafka_config_log_segment_bytes: 536870912
kafka_config_num_io_threads: 4
kafka_config_num_network_threads: 2
kafka_config_num_partitions: 2
kafka_config_offsets_topic_replication_factor: 1
kafka_config_receive_buffer_bytes: 1048576
kafka_config_request_max_bytes: 104857600
kafka_config_send_buffer_bytes: 1048576
kafka_config_zookeeper_connection_timeout_ms: 1000000
kafka_config_zookeeper_servers:
- consul01
- consul02
- consul03
kafka_exporter_version: 1.2.0
kafka_port: 9092
kafka_version: 2.4.0
此数据用于 Ansible 模板。生成的 kafka conf 如下所示:
broker.id=1
port=9092
num.network.threads=2
num.io.threads=4
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka_logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
offsets.topic.replication.factor=1
zookeeper.connect=consul01:2181,consul02:2181,consul03:2181
zookeeper.connection.timeout.ms=1000000
delete.topic.enable=true
请注意,这是用于开发的,并且这些内容经常被重新旋转(每天几次)。每次重新旋转后问题仍然存在。