2

我正在使用 Kafka Node 库,并测试高级生产者。

我创建了一个包含 10 个分区的主题“HLPTestInput”,并编写了一个函数以每秒生成一次。

生产者写入分区 0、2、4、6 和 8,但不写入奇数分区。

奇怪的是,当我从这个主题消费并生产到第二个主题“HLPTestInputFromConsumer”时,它有 5 个分区,消息被写入所有分区。

有没有我缺少的配置?

const kafka = require('kafka-node'),
    HighLevelProducer = kafka.HighLevelProducer,
    ConsumerGroup = kafka.ConsumerGroup,
    client = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
    producer = new HighLevelProducer(client),
    consumer = new ConsumerGroup(
        {
          kafkaHost: 'smc-dev.silverbolt.lab:9092',
            groupId: 'testGroup'
        },
        'HLPTestInput'
    );

let index = 0;
setInterval(() => {
    producer.send([{
        topic: 'HLPTestInput',
        messages: [index]
    }], (err, data) => {
        console.log('produced', data);
    });
    index++;
}, 1000);

consumer.on('message', (message) => {
    console.log('consumed', message);
    producer.send([{
        topic: 'HLPTestInputFromConsumer',
        messages: [message]
    }], (err, data) => {
        console.log('produced to secondary', data);
    });
});
4

1 回答 1

2

我不太确定,但这可能是因为你使用同一个制作人来写两个不同的主题。由于 HighLevelProducer 使用循环来编写。因此,假设您的生产者在“HLPTestInput”主题中写入,然后您将时间间隔设置为 1000,因此在此期间,您的消费者收到消息,现在您的生产者在“HLPTestInputFromConsumer”主题中写入。

因此,您的生产者在其分区 0、2、4 中写入“HLPTestInput”主题...

和“HLPTestInputFromConsumer”主题在其部分 1,3,5 ...

所以我建议尝试创建另一个生产者。然后它应该可以正常工作。

试试下面的代码:

const kafka = require('kafka-node'),
    HighLevelProducer = kafka.HighLevelProducer,
    ConsumerGroup = kafka.ConsumerGroup,
    client = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
    client1 = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
    producer = new HighLevelProducer(client),
    producer1 = new HighLevelProducer(client1),
    consumer = new ConsumerGroup(
       {
          kafkaHost: 'smc-dev.silverbolt.lab:9092',
           groupId: 'testGroup'
        },
        'HLPTestInput'
    );
let index = 0;
    setInterval(() => {
    producer.send([{
        topic: 'HLPTestInput',
        messages: [index]
    }], (err, data) => {
        console.log('produced', data);
    });
   index++;
}, 1000);

consumer.on('message', (message) => {
    console.log('consumed', message);
    producer1.send([{
        topic: 'HLPTestInputFromConsumer',
        messages: [message]
    }], (err, data) => {
        console.log('produced to secondary', data);
    });
});
于 2020-08-19T15:19:27.323 回答