我正在尝试使用 NodeJS 客户端发送 Kafka 消息:
const payload = {
......
}
const sendMessage = () => {
return producer
.send({
topic: KAFKA_PRODUCER,
compression: CompressionTypes.GZIP,
messages: [{value: JSON.stringify(payload)}]
})
.then(e => {
res.send({status: "ok"});
})
.catch(e => {
res.status(503).send({status: "fail"});
})
}
await producer.connect()
.then(sendMessage())
.catch(e => {
console.error(`Unable to connect: ${e.message}`, e);
res.status(503).send({status: "fail"});
});
但是当我第一次发送消息时,我总是会收到这个错误:
Failed to send Kafka message: The producer is disconnected KafkaJSError: The producer is disconnected
at validateConnectionStatus (/Users/server/node_modules/kafkajs/src/producer/messageProducer.js:31:15)
at sendBatch (/Users/server/node_modules/kafkajs/src/producer/messageProducer.js:82:5)
at Object.send (/Users/server/node_modules/kafkajs/src/producer/messageProducer.js:120:12)
at sendMessage (/Users/server/server.js:615:14)
at /Users/server/server.js:630:15
at Layer.handle [as handle_request] (/Users/server/node_modules/express/lib/router/layer.js:95:5)
at next (/Users/server/node_modules/express/lib/router/route.js:137:13)
at isUserSplitLimitReached (/Users/server/server.js:886:16)
at Layer.handle [as handle_request] (/Users/server/node_modules/express/lib/router/layer.js:95:5)
at next (/Users/server/node_modules/express/lib/router/route.js:137:13) {
retriable: true,
helpUrl: undefined
}
我第二次发送消息时它工作正常。
你知道我该如何解决这个问题吗?我使用“kafkajs”:“^1.15.0”