2

我是 kafka 的新手,并使用 kafka-node 在 nodeJS 中实现它。我想在一个主题中创建 3 个分区并同时向所有主题发布消息。我尝试了以下代码,但这里只有一个分区正在创建,所有消息都将发送到该分区。谁能告诉我哪里出错了。太感谢了。

Abc.abcData = async() => {
    try 
    {
        var client = new kafka.KafkaClient();
        var topic = 'newTopic';
        var topicsToCreate = [
        {
            topic: topic,
            partitions: 3,
            replicationFactor: 2,
            replicaAssignment: [
            {
              partition: 0,
              replicas: [0]
            },
            {
                partition: 1,
                replicas: [1]
            },
            {
                partition: 2,
                replicas: [2]
            }
          ]
        },
        ]
        client.createTopics(topicsToCreate, (error, result) => {
        console.log(result);
        });
        var HighLevelProducer = kafka.HighLevelProducer;
        var producer = new HighLevelProducer(client);
        var payloads = [
        { topic: topic, messages: 'this is partition 1!!', partitions: 0},
        { topic: topic, messages: 'this is partition 2!!', partitions: 1},
        { topic: topic, messages: 'this is partition 3!!', partitions: 2}
        ];
        producer.on('ready', function () {
            producer.send(payloads, function (err, result) {
                if (err)
                    console.log(err);
                console.log(result);

            });
        });
        
    } 
    catch (err) 
    {
        console.error(err.message);
    }
};

我收到如下回复-

[ { topic: 'newTopic', error: "Topic 'newTopic' already exists." } ]
{"newTopic":{"0":6}}
4

1 回答 1

2

在这里,您在 kafka 服务器上使用了createTopics(),它仅在 Kafka 服务器上的auto.create.topics.enable设置为 true 时才有效。客户端只需向服务器发送元数据请求,服务器将自动创建主题。当 async 设置为 false 时,该方法在所有主题创建完成后才返回,否则立即返回。因此,这里默认创建一个带有一个分区的主题。要创建多个分区或使其自定义,您必须在server.property文件中添加以下行 -

auto.create.topics.enable=false
于 2020-08-21T12:09:52.203 回答