我正在使用 Kafka 延迟主题消费consumer.pause(<partitions>)
。
Pub/Sub Kafka shim 将 pause 变成 NoOp:
是否有任何关于如何将 pub sub lite 主题的消费延迟设定的持续时间的文档?
即我想使用来自 Pub/Sub Lite 主题的所有消息,但有 4 分钟的综合延迟。
这是我使用 Kafka 原生的算法:
- 称呼
consumer.poll()
- 恢复所有分配的分区
consumer.resume(consumer.assignment())
- 将以前
delayed
的记录与最近轮询的记录结合起来 - 将记录分开
- 足以处理的记录
- 记录还太年轻,无法处理
- 为任何太年轻的记录暂停分区
consumer.pause(<partitions of too young>)
- 保留一个太年轻的记录缓冲区,以便在下一次通过时重新考虑,称为
delayed
- 处理足够老的记录
- 冲洗,重复
我们只提交足够老的记录的偏移量,如果进程死亡,“太年轻”缓冲区中的任何记录都将保持未提交,并且在随后的重新平衡中接收分区的任何消费者都会重新访问它们。
这种算法是否有更通用的形式可以与原生 Kafka 和 Pub/Sub Lite 一起使用?
编辑:CloudTasks 在这里是个坏主意,因为它断开了偏移提交链。我需要确保我只为从下游系统得到确认的记录提交偏移量。