n
我的生产者从单个输入消息生成任务并将它们发布到topic
.
要求是,在 的消费者组中的所有个人消费者中,任何人都不能在 1 小时内topic
处理超过 3 个这些任务中的任何一个。n
这意味着如果我想立即处理所有这些消息,我至少需要ceil(n/3)
消费者。如果消费者少于ceil(n/3)
消费者,那么我需要某种方式将消息推迟到num_processed < 3
最后一小时。
就实施此解决方案的实用性而言,我希望将 Kafka 与 Faust [1] 一起使用,但如有必要,我也可以访问 Redis。
到目前为止,我的想法是确保ceil(n/3)
在生产时至少有消费者,然后只使用生产者的循环分配任务topic
。无论如何,这是最佳解决方案,因为它无需等待长达 1 小时来处理消息。然而,这只会在足够多的消费者死亡之后才会起作用,因此同一消费者很可能会在 1 小时内处理超过 3 个。这是无法接受的。
另一个想法可能是让消费者在每次收到消息时检查他们是否已经执行了 3 个n
任务,如果是,则以某种方式请求另一个消费者处理它 - 但我在 Kafka 中找不到任何合适的机制来启用此功能.