1

我们观察到 Java Kafka Producer 0.9 客户端在发送小消息时性能非常差。消息不会累积到更大的请求批次中,因此每个小记录都是单独发送的。

我们的客户端配置有什么问题?还是这是其他问题?


使用 Kafka 客户端 0.9.0.0。我们在 Kafka 未发布的 9.0.1 或 9.1 固定或未解决列表中没有看到任何相关的帖子,因此我们专注于我们的客户端配置和服务器实例。

我们理解 linger.ms 应该导致客户端将记录累积到一个批次中。

我们将 linger.ms 设置为 10(也尝试了 100 和 1000),但这些并没有导致批量累积记录。对于大约 100 字节的记录大小和 16K 的请求缓冲区大小,我们预计在单个请求中发送大约 160 条消息。

尽管已经分配了一个新的 Bluemix Messaging Hub (Kafka Server 0.9) 服务实例,但客户端的跟踪似乎表明该分区可能已满。测试客户端在没有其他 I/O 的情况下循环发送多条消息。


日志显示带有可疑行的重复序列:“ Wake up the sender since topic mytopic partition 0 is full or getting a new batch ”。

因此,在我们的测试用例中,新分配的分区本质上应该是空的,那么为什么生产者客户端会得到一个新批次呢?

2015-12-10 15:14:41,335 3677 [main] TRACE com.isllc.client.producer.ExploreProducer - 发送记录:Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00011 2015-12-10T15:14:41.335-05:00'  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - 发送记录 ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af, value=[B @4923ab24 回调 null 到主题 mytopic 分区 0  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator - 为主题 mytopic 分区 0 分配一个新的 16384 字节消息缓冲区  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - 由于主题 mytopic 分区 0 已满或获得新批次,因此唤醒发件人  
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExplorerProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - 准备发送数据的节点:[Node(0, kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)]  
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - 创建了 1 个生产请求:[ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, request= RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0, record_set=java.nio.HeapByteBuffer[pos=0 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)]  
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - 从节点 0 接收到相关 ID 为 11 的生产响应  
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExplorerProducer] TRACE org.apache.kafka.clients.producer.internals.RecordBatch - 向主题分区 mytopic-0 生成消息,基本偏移量为 130,错误:null。  
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - 发送返回的元数据:Topic='mytopic',Partition=0,Offset=130  
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - 发送记录:Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00012 2015-12-10T15:14:41.412-05:00'

对于发送的每条记录,日志条目都像上面一样重复

我们提供了以下属性文件:

2015-12-10 15:14:37,843 185 [main] INFO com.isllc.client.AbstractClient - 从 Kafka 客户端文件中检索到的属性:kafka-producer.properties
2015-12-10 15:14:37,909 251 [主要] 信息 com.isllc.client.AbstractClient-acks=-1
2015-12-10 15:14:37,909 251 [主要] 信息 com.isllc.client.AbstractClient - ssl.protocol=TLSv1.2
2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient - key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - client.id=ExploreProducer
2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - ssl.truststore.identification.algorithm=HTTPS
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - ssl.truststore.password=changeit
2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - ssl.truststore.type=JKS
2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - ssl.enabled.protocols=TLSv1.2
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib /安全/cacerts
2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub .services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05 -prod01.messagehub.services.us-south.bluemix.net:9094
2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient-security.protocol=SASL_SSL

另外,我们在代码中添加了 linger.ms=10。

Kafka 客户端显示扩展/合并的配置列表(并显示 linger.ms 设置):

2015-12-10 15:14:37,970 312 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig 值:
    压缩类型=无
    metric.reporters = []
    元数据.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    重新连接.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, kafka03-prod01.messagehub.services.us -south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094]
    重试.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    缓冲区.内存 = 33554432
    超时.ms = 30000
    key.serializer = 类 org.apache.kafka.common.serialization.ByteArraySerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    max.block.ms = 60000
    sasl.kerberos.min.time.before.relogin = 60000
    连接数.max.idle.ms = 540000
    ssl.truststore.password = [隐藏]
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    client.id = 探索生产者
    ssl.endpoint.identification.algorithm = null
    ssl.protocol = TLSv1.2
    request.timeout.ms = 30000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2]
    确认 = -1
    批量大小 = 16384
    ssl.keystore.location = null
    接收缓冲区字节 = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = SASL_SSL
    重试 = 0
    最大请求大小 = 1048576
    value.serializer = 类 org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.truststore.location = /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    指标.sample.window.ms = 30000
    partitioner.class = 类 org.apache.kafka.clients.producer.internals.DefaultPartitioner
    发送缓冲区字节 = 131072
    逗留.ms = 10

发送 100 条记录后的 Kafka 指标:

100 次发送的持续时间为 8787 毫秒。发送了 7687 个字节。  
    batch-size-avg = 109.87 [每个请求每个分区发送的平均字节数。]  
    batch-size-max = 110.0 [每个请求每个分区发送的最大字节数。]  
    buffer-available-bytes = 3.3554432E7 [未使用的缓冲内存总量(未分配或在空闲列表中)。]  
    buffer-exhausted-rate = 0.0 [由于缓冲区耗尽而丢弃的平均每秒记录发送数]  
    buffer-total-bytes = 3.3554432E7 [客户端可以使用的最大缓冲内存量(无论当前是否使用)。]  
    bufferpool-wait-ratio = 0.0 [appender 等待空间分配的时间分数。]  
    字节率 = 291.8348916277093 []  
    压缩率 = 0.0 []  
    compression-rate-avg = 0.0 [记录批次的平均压缩率。]  
    connection-close-rate = 0.0 [窗口中每秒关闭的连接数。]  
    connection-count = 2.0 [当前活动连接数。]  
    connection-creation-rate = 0.05180541884681138 [窗口中每秒建立的新连接。]  
    传入字节率 = 10.342564641029007 []  
    io-ratio = 0.0038877559207471236 [I/O 线程花费在 I/O 上的时间分数]  
    io-time-ns-avg = 353749.2840375587 [每次选择调用的 I/O 平均时间长度,以纳秒为单位。]  
    io-wait-ratio = 0.21531227995769162 [I/O 线程等待的时间分数。]  
    io-wait-time-ns-avg = 1.9591901192488264E7 [I/O 线程等待套接字准备好读取或写入的平均时间,以纳秒为单位。]
    metadata-age = 8.096 [当前使用的生产者元数据的年龄,以秒为单位。]  
    network-io-rate = 5.2937784999213795 [每秒所有连接上的平均网络操作(读取或写入)数。]  
    传出字节率 = 451.2298783403283 []  
    generate-throttle-time-avg = 0.0 [平均油门时间,以毫秒为单位]  
    producer-throttle-time-max = 0.0 [最大油门时间,以毫秒为单位]  
    record-error-rate = 0.0 [导致错误的平均每秒记录发送数]  
    record-queue-time-avg = 15.5 [在记录累加器中花费的平均记录批次的毫秒时间。]  
    record-queue-time-max = 434.0 [在记录累加器中花费的最大记录批次时间,以毫秒为单位。]  
    记录重试率 = 0.0 []  
    record-send-rate = 2.65611304417116 [每秒发送的平均记录数。]  
    record-size-avg = 97.87 [平均记录大小]  
    record-size-max = 98.0 [最大记录大小]  
    records-per-request-avg = 1.0 [每个请求的平均记录数。]  
    request-latency-avg = 0.0 [平均请求延迟,以毫秒为单位]  
    请求延迟最大值 = 74.0 []  
    request-rate = 2.6468892499606897 [每秒发送的平均请求数。]  
    request-size-avg = 42.0 [窗口中所有请求的平均大小..]  
    request-size-max = 170.0 [在窗口中发送的任何请求的最大大小。]  
    requests-in-flight = 0.0 [等待响应的当前正在进行的请求数。]  
    response-rate = 2.651196976060479 [每秒收到的平均响应数。]  
    select-rate = 10.989861465830819 [I/O 层每秒检查新 I/O 执行的次数]  
    waiting-threads = 0.0 [阻塞等待缓冲区内存入队记录的用户线程数]  

谢谢

4

1 回答 1

5

通过查看我们的应用程序代码,Kafka 用户邮件列表中的 Guozhang Wang 能够识别出问题:

国章,

是的 - 你发现了问题!

我们插入了 .get() 进行调试,但没有想到(巨大的!)副作用。

使用异步回调效果很好。

我们现在能够在 14 秒内将 100,000 条记录从笔记本电脑发送到 Bluemix 云 - 速度快了约 1000 倍,

非常感谢!

加里


2015 年 12 月 13 日下午 2:48,王国璋写道:

加里,

您正在调用“kafkaProducer.send(record).get();” 对于每条消息,get() 调用会阻塞,直到 Future 被初始化,这有效地同步了通过在发送下一条消息之前为每条消息请求 ACK 发送的所有消息,因此没有批处理。

您可以尝试使用“send(record, callback)”进行异步发送,并让回调处理返回元数据中的错误。

国章

于 2015-12-13T22:38:59.447 回答