场景:
我将 autocommit 设置为 false,
产生 12 条消息
消耗它们..(比如从偏移量 100)
关闭消费者
启动一个新消费者
在那个阶段,我希望第二个消费者从偏移量 100 开始再次读取所有消息(因为没有完成提交)
但是当产生新消息时,我看到第二个消费者从新的偏移量(113)开始,即提交仍然以某种方式发生..
我怎么了?
那是我的消费者代码
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['192.168.14.10:9095']
});
const admin = kafka.admin();
const consumer = kafka.consumer({ groupId: 'test-group' });
const run = async () => {
// admin
await admin.connect();
// Consuming
await consumer.connect();
await consumer.subscribe({ topic: 'topic-test2'});
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString()
});
}
}
});
};
run().catch(console.error);