在连接消费者组之前,我正在尝试通过管理对象重置 kafka 偏移量。
我打电话
await admin.setOffsets({groupId: gID, topic, partitions: await admin.fetchTopicOffsetsByTimestamp(topic, 1635358089189) });
在开始 kafka 消费者之前,但我最终得到了这个错误: The consumer group must have no running instances, current state: Stable
有时,并非总是如此,但几乎在每次交替运行时。
我正在遵循的步骤
- 首先让 kafka 消费者从最新的偏移量开始消费。
- 通过 终止进程
ctrl + c
,它调用下面的方法(测试突然终止)
await consumerGroup.stop();
await consumerGroup.stop()
process.kill(process.pid)
process.exit(0)
我立即重新启动我的服务器。
在这次运行中,kafka 管理员将首先通过上面的代码重置偏移量,然后它会连接消费者来消费消息。
重新启动服务器后的某些时候,它工作正常,偏移量被重置并且消费者运行完美,但是当我重新启动服务器时,每隔一次运行我都会收到这个错误:
The consumer group must have no running instances, current state: Stable
预期行为 当我调用 consumerGroup.stop() 和 consumerGroup.stop() 并终止进程时,服务器重新启动时不应有任何正在运行的消费者组实例
观察到的行为
即使在停止和断开消费者之后,仍然有任何消费者组处于运行状态,因此我无法重置偏移量,因为如果有任何正在运行的消费者实例,我们无法重置偏移量。参考链接:
The consumer group must have no running instances when performing the reset
https ://kafka.js.org/docs/admin#a-name-reset-offsets-by-timestamp-a-reset-consumer-group-offsets-by-timestamp
环境:
- Mac 操作系统 11.2.1
- KafkaJS 版本:1.15.0
- 卡夫卡版本:2.8.1
- NodeJS 版本:v13.8.0
附加上下文 我在这里遗漏了什么吗?我是 node.js 和 kafka 的初学者,可能在这里遗漏了一些明显的东西,所以请指出,或者随时提出任何其他解决方案来解决这个问题,谢谢。