1

场景:
我将 autocommit 设置为 false,
产生 12 条消息
消耗它们..(比如从偏移量 100)
关闭消费者
启动一个新消费者

在那个阶段,我希望第二个消费者从偏移量 100 开始再次读取所有消息(因为没有完成提交)

但是当产生新消息时,我看到第二个消费者从新的偏移量(113)开始,即提交仍然以某种方式发生..

我怎么了?

那是我的消费者代码

const { Kafka } = require('kafkajs');
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['192.168.14.10:9095']
});
const admin = kafka.admin();
const consumer = kafka.consumer({ groupId: 'test-group' });
const run = async () => {
  // admin
  await admin.connect();
  // Consuming
  await consumer.connect();
  await consumer.subscribe({ topic: 'topic-test2'});
  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString()
      });
      }
    }
  });
};
run().catch(console.error);
4

0 回答 0