现在,我想在node-rdkafka
我们的服务中实现,但我多次遇到这个错误Broker: Unknown member
。github 上的相同问题是https://github.com/confluentinc/confluent-kafka-dotnet/issues/1464。他们说我们的消费者使用相同的组 id 重试或延迟。但我没有发现我的代码有任何重试和延迟。或https://github.com/confluentinc/confluent-kafka-python/issues/1004,但我已经重新检查了所有消费者组 ID,它是独一无二的。
生产者的配置node-rdkafka
如下:
this.producer = new Producer({
"client.id": this.cliendID,
"metadata.broker.list": this.brokerList,
'compression.codec': "lz4",
'retry.backoff.ms': 200,
'socket.keepalive.enable': true,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 1000,
'batch.num.messages': 1000000,
"transaction.timeout.ms": 2000,
"enable.idempotence": false,
"max.in.flight.requests.per.connection": 1,
"debug": this.debug,
'dr_cb': true,
"retries": 0,
"log_cb": (_: any) => console.log(`log_cb =>`, _),
"sasl.username": this.saslUsername,
"sasl.password": this.saslPassword,
"sasl.mechanism": this.saslMechanism,
"security.protocol": this.securityProtocol
}, {
"acks": -1
})
消费者配置node-rdkafka
如下:
this.consumer = new KafkaConsumer({
'group.id': this.groupID,
'metadata.broker.list': this.brokerList,
"sasl.username": this.saslUsername,
"sasl.password": this.saslPassword,
"enable.auto.commit": false,
"auto.commit.interval.ms": 2000,
"session.timeout.ms": 45000,
"max.poll.interval.ms": 300000,
"heartbeat.interval.ms": 3000,
"api.version.request.timeout.ms": 10000,
"max.in.flight.requests.per.connection": 1,
"debug": this.debug,
"sasl.mechanism": this.saslMechanism,
"log.connection.close": true,
"log.queue": true,
"log_level": 7,
"log.thread.name": true,
"isolation.level": "read_committed",
"ssl.ca.location": "/etc/ssl/certs/",
"log_cb": (_: any) => console.log(`log_cb =>`, _),
"security.protocol": this.securityProtocol
}, {})
await new Promise(resolve => {
this.consumer?.connect()
this.consumer?.on('ready', () => {
try {
this.consumer?.subscribe(subscriptions)
this.consumer?.consume()
console.log('[SUCCESS] Subscribe Event => all event')
} catch (err) {
console.log('[FAILED] Subscribe => all event')
console.log(err)
}
resolve(this.consumer)
}).on('data', async (data) => {
this.topicFunctionMap[data.topic]({
partition: data.partition,
topic: data.topic,
message: {
key: data.key,
offset: data.offset.toString(),
size: data.size,
value: data.value,
timestamp: data.timestamp?.toString()
}
} as ISubsCallbackParam)
this.consumer?.commitSync({
topic: data.topic,
offset: data.offset,
partition: data.partition
})
})
})
使用这些配置,消费者能够接收事件,但它不会持续很长时间。大约 2 小时后,它会随机给出这些错误。我不确定是因为手动提交还是我们的功能需要很长时间,因为我已经尝试了两者async
并sync
提交,所以 commitSync 它不依赖于我们的功能。
之所以这么说,是因为我们的函数需要很长时间,并且它使我们的消费者被踢出小组。在我发现额外的错误 Broker: Specified group generation id is not valid 之后,它可能是嫌疑人
来源:https ://github.com/confluentinc/confluent-kafka-dotnet/issues/1155
它说我需要增加会话超时,然后我尝试将其增加到"session.timeout.ms": 300000
5 分钟,并且"heartbeat.interval.ms":3000
我在 github 问题中发现心跳应该小于 = (timeout/3)。所以我认为 3sec 就可以了。
使用"session.timeout.ms": 300000
和"heartbeat.interval.ms":3000
消费者能够消费并持续很长时间,但问题是:
- 第一次使用这些配置,它可以运行大约 0-2 秒来接收
- 过了一会儿,它收到了,但需要 1-10 秒才能收到消息
详细错误:
received event => onCustomerServiceRegister
[COMMIT_ERR] LibrdKafkaError: Broker: Unknown member
at Function.createLibrdkafkaError [as create] (/src/app/node_modules/node-rdkafka/lib/error.js:454:10)
at KafkaConsumer.Client._errorWrap (/src/app/node_modules/node-rdkafka/lib/client.js:481:29)
at KafkaConsumer.commitSync (/src/app/node_modules/node-rdkafka/lib/kafka-consumer.js:560:8)
at KafkaRDConnect.<anonymous> (/src/app/dist/events/connectors/kafkaRD.js:240:110)
at step (/src/app/dist/events/connectors/kafkaRD.js:53:23)
at Object.next (/src/app/dist/events/connectors/kafkaRD.js:34:53)
at /src/app/dist/events/connectors/kafkaRD.js:28:71
at new Promise (<anonymous>)
at __awaiter (/src/app/dist/events/connectors/kafkaRD.js:24:12)
at KafkaConsumer.<anonymous> (/src/app/dist/events/connectors/kafkaRD.js:213:72)
at KafkaConsumer.emit (node:events:376:20)
at KafkaConsumer.EventEmitter.emit (node:domain:470:12)
at /src/app/node_modules/node-rdkafka/lib/kafka-consumer.js:488:12 {