0

我正在使用 Kafka 框架提供的默认 server.properties/zookeeper.properties 文件。

我正在尝试创建一个简单的 NodeJS 应用程序,它将向 Producer 发送消息并使用它们。

下面是 NodeJS 代码。

配置.js

module.exports = {
  kafka_topic: 'catalog',
  kafka_server: 'localhost:9092',
};

nodejs-producer.js

const kafka = require('kafka-node');
const config = require('./config');

try {

    // set the desired timeout in options
    const options = {
        timeout: 5000,
    };
    const Producer = kafka.Producer;
    const client = new kafka.KafkaClient({kafkaHost: config.kafka_server, requestTimeout: 5000});
    const producer = new Producer(client);
    const kafka_topic = config.kafka_topic;
    let payloads = [
        {
            topic: kafka_topic,
            messages: 'This is test message'
        }
    ];

    producer.on('ready', async function() {
        let push_status = producer.send(payloads, (err, data) => {
            if (err) {
                console.log(err.toString());
                console.log('[kafka-producer -> '+kafka_topic+']: broker update failed');
            } else {
                console.log(data.toString());
                console.log('[kafka-producer -> '+kafka_topic+']: broker update success');
            }
        });
    });

    producer.on('error', function(err) {
        console.log(err);
        console.log('[kafka-producer -> '+kafka_topic+']: connection errored');
        throw err;
    });
}
catch(e) {
    console.log(e);
}

kafka 版本 = 2.8.0 kafka 节点版本 = 5.0.0

我收到错误消息 - 错误:LeaderNotAvailable

如何解决这个问题?我尝试在 server.properties 文件中使用不同的值,例如adverted.listeners,但没有得到解决方案。

4

2 回答 2

0

我已经在这里回答了这个问题

简而言之:当尝试向不存在的主题生成消息时会发生此问题。

您可以将您的 kafka 安装配置为在这种情况下自动创建主题:接下来会发生什么 - 按顺序:您仍然会收到错误消息并且框架将创建主题。在我的情况下,我不得不第二次重新生成相同的消息,但这是在旧版本的 Kafka 上。

编辑: 这里有一个帖子的链接,它解释了如何设置您的 kafka 配置以自动创建 kafka 主题。

于 2021-09-14T12:34:07.240 回答
0

我在发送消息时也遇到了同样的问题。我通过在有效负载中添加一个分区解决了这个问题,并且消费者也使用了相同的分区。

我用过的代码

于 2022-01-03T11:27:35.333 回答