0

在连接消费者组之前,我正在尝试通过管理对象重置 kafka 偏移量。

我打电话

await admin.setOffsets({groupId: gID, topic, partitions: await admin.fetchTopicOffsetsByTimestamp(topic, 1635358089189) });

在开始 kafka 消费者之前,但我最终得到了这个错误: The consumer group must have no running instances, current state: Stable有时,并非总是如此,但几乎在每次交替运行时。

我正在遵循的步骤

  1. 首先让 kafka 消费者从最新的偏移量开始消费。
  2. 通过 终止进程ctrl + c,它调用下面的方法(测试突然终止)
await consumerGroup.stop();
await consumerGroup.stop()
process.kill(process.pid)
process.exit(0)
  1. 我立即重新启动我的服务器。

  2. 在这次运行中,kafka 管理员将首先通过上面的代码重置偏移量,然后它会连接消费者来消费消息。

  3. 重新启动服务器后的某些时候,它工作正常,偏移量被重置并且消费者运行完美,但是当我重新启动服务器时,每隔一次运行我都会收到这个错误:The consumer group must have no running instances, current state: Stable

预期行为 当我调用 consumerGroup.stop() 和 consumerGroup.stop() 并终止进程时,服务器重新启动时不应有任何正在运行的消费者组实例

观察到的行为 即使在停止和断开消费者之后,仍然有任何消费者组处于运行状态,因此我无法重置偏移量,因为如果有任何正在运行的消费者实例,我们无法重置偏移量。参考链接: The consumer group must have no running instances when performing the reset https ://kafka.js.org/docs/admin#a-name-reset-offsets-by-timestamp-a-reset-consumer-group-offsets-by-timestamp

环境:

  • Mac 操作系统 11.2.1
  • KafkaJS 版本:1.15.0
  • 卡夫卡版本:2.8.1
  • NodeJS 版本:v13.8.0

附加上下文 我在这里遗漏了什么吗?我是 node.js 和 kafka 的初学者,可能在这里遗漏了一些明显的东西,所以请指出,或者随时提出任何其他解决方案来解决这个问题,谢谢。

4

0 回答 0