我正在使用 kafka-node 模块向 kafka 发送消息。在集群环境中,我有一个具有 3 个分区和复制因子为 3 的主题。
主题描述是 -
Topic:clusterTopic PartitionCount:3 ReplicationFactor:3 Configs:min.insync.replicas=2,segment.bytes=1073741824
Topic: clusterTopic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: clusterTopic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
Topic: clusterTopic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
生产者配置 -
"requireAcks": 1,
"attributes": 2,
"partitionerType": 2,
"retries": 2
当我发送数据时,它遵循分区类型为循环(2),如循环方式
当我按照以下步骤操作时
- 获取连接到kafka:9092,kafka:9093的HighLevelProducer实例
- 发送一个消息
- 手动停止 kafka-server:9092
- 尝试使用 HighLevelProducer 发送另一条消息,并且 send() 将触发回调并出现错误:TimeoutError: Request timed out after 30000ms
我期望的是,如果一个分区不可访问(因为代理关闭),生产者应该自动将数据发送到下一个可用分区,但由于异常我丢失了消息
例外情况如下 -
TimeoutError: Request timed out after 3000ms
at new TimeoutError (\package\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
at Timeout.timeoutId._createTimeout [as _onTimeout] (\package\node_modules\kafka-node\lib\kafkaClient.js:980:14)
at ontimeout (timers.js:424:11)
at tryOnTimeout (timers.js:288:5)
at listOnTimeout (timers.js:251:5)
at Timer.processTimers (timers.js:211:10)
(node:56416) [DEP0079] DeprecationWarning: Custom inspection function on Objects via .inspect() is deprecated
kafka-node:KafkaClient kafka-node-client reconnecting to kafka1:9092 +3s
kafka-node:KafkaClient createBroker kafka1 9092 +1ms
kafka-node:KafkaClient kafka-node-client reconnecting to kafka1:9092 +3s
kafka-node:KafkaClient createBroker kafka1 9092 +0ms