0

我正在使用 Kafka 延迟主题消费consumer.pause(<partitions>)

Pub/Sub Kafka shim 将 pause 变成 NoOp:

https://github.com/googleapis/java-pubsublite-kafka/blob/v0.6.7/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java#L590-L600

是否有任何关于如何将 pub sub lite 主题的消费延迟设定的持续时间的文档?

即我想使用来自 Pub/Sub Lite 主题的所有消息,但有 4 分钟的综合延迟。

这是我使用 Kafka 原生的算法:

  • 称呼consumer.poll()
  • 恢复所有分配的分区consumer.resume(consumer.assignment())
  • 将以前delayed的记录与最近轮询的记录结合起来
  • 将记录分开
    • 足以处理的记录
    • 记录还太年轻,无法处理
  • 为任何太年轻的记录暂停分区consumer.pause(<partitions of too young>)
  • 保留一个太年轻的记录缓冲区,以便在下一次通过时重新考虑,称为delayed
  • 处理足够老的记录
  • 冲洗,重复

我们只提交足够老的记录的偏移量,如果进程死亡,“太年轻”缓冲区中的任何记录都将保持未提交,并且在随后的重新平衡中接收分区的任何消费者都会重新访问它们。

这种算法是否有更通用的形式可以与原生 Kafka 和 Pub/Sub Lite 一起使用?

编辑:CloudTasks 在这里是个坏主意,因为它断开了偏移提交链。我需要确保我只为从下游系统得到确认的记录提交偏移量。

4

1 回答 1

0

pause如果您删除andresume阶段,与上述类似的东西可能会正常工作。我会注意到,对于这两个系统,您不能保证在任何给定的 poll() 调用中直到现在都接收到服务器上存在的所有消息,因此如果您没有为给定分区提供任何记录,您可能会增加额外的延迟投票电话。

如果您在启用自动提交的情况下执行以下操作,您应该有效地延迟处理严格超过 4 分钟。

  1. 称呼consumer.poll()
  2. 睡到每条记录 4 分钟大
  3. 过程记录
  4. 转到 1。

如果您使用手动提交,您可以使每条消息的睡眠时间接近 4 分钟,但缺点是需要手动管理偏移量:

  1. 称呼consumer.poll()
  2. 将记录放入有序的每个分区缓冲区
  3. 睡眠直到任何分区的最旧记录是过去 4 分钟
  4. 处理过去 4 分钟以上的记录
  5. 已处理记录的提交偏移量
  6. 去 1
于 2021-11-18T16:11:17.393 回答