我们观察到 Java Kafka Producer 0.9 客户端在发送小消息时性能非常差。消息不会累积到更大的请求批次中,因此每个小记录都是单独发送的。
我们的客户端配置有什么问题?还是这是其他问题?
使用 Kafka 客户端 0.9.0.0。我们在 Kafka 未发布的 9.0.1 或 9.1 固定或未解决列表中没有看到任何相关的帖子,因此我们专注于我们的客户端配置和服务器实例。
我们理解 linger.ms 应该导致客户端将记录累积到一个批次中。
我们将 linger.ms 设置为 10(也尝试了 100 和 1000),但这些并没有导致批量累积记录。对于大约 100 字节的记录大小和 16K 的请求缓冲区大小,我们预计在单个请求中发送大约 160 条消息。
尽管已经分配了一个新的 Bluemix Messaging Hub (Kafka Server 0.9) 服务实例,但客户端的跟踪似乎表明该分区可能已满。测试客户端在没有其他 I/O 的情况下循环发送多条消息。
日志显示带有可疑行的重复序列:“ Wake up the sender since topic mytopic partition 0 is full or getting a new batch ”。
因此,在我们的测试用例中,新分配的分区本质上应该是空的,那么为什么生产者客户端会得到一个新批次呢?
2015-12-10 15:14:41,335 3677 [main] TRACE com.isllc.client.producer.ExploreProducer - 发送记录:Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00011 2015-12-10T15:14:41.335-05:00' 2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - 发送记录 ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af, value=[B @4923ab24 回调 null 到主题 mytopic 分区 0 2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator - 为主题 mytopic 分区 0 分配一个新的 16384 字节消息缓冲区 2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - 由于主题 mytopic 分区 0 已满或获得新批次,因此唤醒发件人 2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExplorerProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - 准备发送数据的节点:[Node(0, kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)] 2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - 创建了 1 个生产请求:[ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, request= RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0, record_set=java.nio.HeapByteBuffer[pos=0 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)] 2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - 从节点 0 接收到相关 ID 为 11 的生产响应 2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExplorerProducer] TRACE org.apache.kafka.clients.producer.internals.RecordBatch - 向主题分区 mytopic-0 生成消息,基本偏移量为 130,错误:null。 2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - 发送返回的元数据:Topic='mytopic',Partition=0,Offset=130 2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - 发送记录:Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00012 2015-12-10T15:14:41.412-05:00' 对于发送的每条记录,日志条目都像上面一样重复
我们提供了以下属性文件:
2015-12-10 15:14:37,843 185 [main] INFO com.isllc.client.AbstractClient - 从 Kafka 客户端文件中检索到的属性:kafka-producer.properties 2015-12-10 15:14:37,909 251 [主要] 信息 com.isllc.client.AbstractClient-acks=-1 2015-12-10 15:14:37,909 251 [主要] 信息 com.isllc.client.AbstractClient - ssl.protocol=TLSv1.2 2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient - key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - client.id=ExploreProducer 2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - ssl.truststore.identification.algorithm=HTTPS 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - ssl.truststore.password=changeit 2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - ssl.truststore.type=JKS 2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient - ssl.enabled.protocols=TLSv1.2 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib /安全/cacerts 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub .services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05 -prod01.messagehub.services.us-south.bluemix.net:9094 2015-12-10 15:14:37,910 252 [主要] 信息 com.isllc.client.AbstractClient-security.protocol=SASL_SSL 另外,我们在代码中添加了 linger.ms=10。
Kafka 客户端显示扩展/合并的配置列表(并显示 linger.ms 设置):
2015-12-10 15:14:37,970 312 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig 值: 压缩类型=无 metric.reporters = [] 元数据.max.age.ms = 300000 metadata.fetch.timeout.ms = 60000 重新连接.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, kafka03-prod01.messagehub.services.us -south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094] 重试.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit 缓冲区.内存 = 33554432 超时.ms = 30000 key.serializer = 类 org.apache.kafka.common.serialization.ByteArraySerializer sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX block.on.buffer.full = false ssl.key.password = null max.block.ms = 60000 sasl.kerberos.min.time.before.relogin = 60000 连接数.max.idle.ms = 540000 ssl.truststore.password = [隐藏] max.in.flight.requests.per.connection = 5 metrics.num.samples = 2 client.id = 探索生产者 ssl.endpoint.identification.algorithm = null ssl.protocol = TLSv1.2 request.timeout.ms = 30000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2] 确认 = -1 批量大小 = 16384 ssl.keystore.location = null 接收缓冲区字节 = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = SASL_SSL 重试 = 0 最大请求大小 = 1048576 value.serializer = 类 org.apache.kafka.common.serialization.ByteArraySerializer ssl.truststore.location = /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 指标.sample.window.ms = 30000 partitioner.class = 类 org.apache.kafka.clients.producer.internals.DefaultPartitioner 发送缓冲区字节 = 131072 逗留.ms = 10
发送 100 条记录后的 Kafka 指标:
100 次发送的持续时间为 8787 毫秒。发送了 7687 个字节。 batch-size-avg = 109.87 [每个请求每个分区发送的平均字节数。] batch-size-max = 110.0 [每个请求每个分区发送的最大字节数。] buffer-available-bytes = 3.3554432E7 [未使用的缓冲内存总量(未分配或在空闲列表中)。] buffer-exhausted-rate = 0.0 [由于缓冲区耗尽而丢弃的平均每秒记录发送数] buffer-total-bytes = 3.3554432E7 [客户端可以使用的最大缓冲内存量(无论当前是否使用)。] bufferpool-wait-ratio = 0.0 [appender 等待空间分配的时间分数。] 字节率 = 291.8348916277093 [] 压缩率 = 0.0 [] compression-rate-avg = 0.0 [记录批次的平均压缩率。] connection-close-rate = 0.0 [窗口中每秒关闭的连接数。] connection-count = 2.0 [当前活动连接数。] connection-creation-rate = 0.05180541884681138 [窗口中每秒建立的新连接。] 传入字节率 = 10.342564641029007 [] io-ratio = 0.0038877559207471236 [I/O 线程花费在 I/O 上的时间分数] io-time-ns-avg = 353749.2840375587 [每次选择调用的 I/O 平均时间长度,以纳秒为单位。] io-wait-ratio = 0.21531227995769162 [I/O 线程等待的时间分数。] io-wait-time-ns-avg = 1.9591901192488264E7 [I/O 线程等待套接字准备好读取或写入的平均时间,以纳秒为单位。] metadata-age = 8.096 [当前使用的生产者元数据的年龄,以秒为单位。] network-io-rate = 5.2937784999213795 [每秒所有连接上的平均网络操作(读取或写入)数。] 传出字节率 = 451.2298783403283 [] generate-throttle-time-avg = 0.0 [平均油门时间,以毫秒为单位] producer-throttle-time-max = 0.0 [最大油门时间,以毫秒为单位] record-error-rate = 0.0 [导致错误的平均每秒记录发送数] record-queue-time-avg = 15.5 [在记录累加器中花费的平均记录批次的毫秒时间。] record-queue-time-max = 434.0 [在记录累加器中花费的最大记录批次时间,以毫秒为单位。] 记录重试率 = 0.0 [] record-send-rate = 2.65611304417116 [每秒发送的平均记录数。] record-size-avg = 97.87 [平均记录大小] record-size-max = 98.0 [最大记录大小] records-per-request-avg = 1.0 [每个请求的平均记录数。] request-latency-avg = 0.0 [平均请求延迟,以毫秒为单位] 请求延迟最大值 = 74.0 [] request-rate = 2.6468892499606897 [每秒发送的平均请求数。] request-size-avg = 42.0 [窗口中所有请求的平均大小..] request-size-max = 170.0 [在窗口中发送的任何请求的最大大小。] requests-in-flight = 0.0 [等待响应的当前正在进行的请求数。] response-rate = 2.651196976060479 [每秒收到的平均响应数。] select-rate = 10.989861465830819 [I/O 层每秒检查新 I/O 执行的次数] waiting-threads = 0.0 [阻塞等待缓冲区内存入队记录的用户线程数]
谢谢