0

我正在尝试使用 NodeJS 客户端发送 Kafka 消息:

const payload = {
        ......
    }
    const sendMessage = () => {
        return producer
            .send({
                topic: KAFKA_PRODUCER,
                compression: CompressionTypes.GZIP,
                messages: [{value: JSON.stringify(payload)}]
            })
            .then(e => {
                res.send({status: "ok"});
            })
            .catch(e => {
                res.status(503).send({status: "fail"});
            })
    }
    await producer.connect()
        .then(sendMessage())
        .catch(e => {
            console.error(`Unable to connect: ${e.message}`, e);
            res.status(503).send({status: "fail"});
        });

但是当我第一次发送消息时,我总是会收到这个错误:

Failed to send Kafka message: The producer is disconnected KafkaJSError: The producer is disconnected
    at validateConnectionStatus (/Users/server/node_modules/kafkajs/src/producer/messageProducer.js:31:15)
    at sendBatch (/Users/server/node_modules/kafkajs/src/producer/messageProducer.js:82:5)
    at Object.send (/Users/server/node_modules/kafkajs/src/producer/messageProducer.js:120:12)
    at sendMessage (/Users/server/server.js:615:14)
    at /Users/server/server.js:630:15
    at Layer.handle [as handle_request] (/Users/server/node_modules/express/lib/router/layer.js:95:5)
    at next (/Users/server/node_modules/express/lib/router/route.js:137:13)
    at isUserSplitLimitReached (/Users/server/server.js:886:16)
    at Layer.handle [as handle_request] (/Users/server/node_modules/express/lib/router/layer.js:95:5)
    at next (/Users/server/node_modules/express/lib/router/route.js:137:13) {
  retriable: true,
  helpUrl: undefined
}

我第二次发送消息时它工作正常。

你知道我该如何解决这个问题吗?我使用“kafkajs”:“^1.15.0”

4

0 回答 0