0

n我的生产者从单个输入消息生成任务并将它们发布到topic.

要求是,在 的消费者组中的所有个人消费者中,任何人都不能在 1 小时内topic处理超过 3 个这些任务中的任何一个。n

这意味着如果我想立即处理所有这些消息,我至少需要ceil(n/3)消费者。如果消费者少于ceil(n/3)消费者,那么我需要某种方式将消息推迟到num_processed < 3最后一小时。

就实施此解决方案的实用性而言,我希望将 Kafka 与 Faust [1] 一起使用,但如有必要,我也可以访问 Redis。

到目前为止,我的想法是确保ceil(n/3)在生产时至少有消费者,然后只使用生产者的循环分配任务topic。无论如何,这是最佳解决方案,因为它无需等待长达 1 小时来处理消息。然而,这只会在足够多的消费者死亡之后才会起作用,因此同一消费者很可能会在 1 小时内处理超过 3 个。这是无法接受的。

另一个想法可能是让消费者在每次收到消息时检查他们是否已经执行了 3 个n任务,如果是,则以某种方式请求另一个消费者处理它 - 但我在 Kafka 中找不到任何合适的机制来启用此功能.

[1] https://faust.readthedocs.io/

4

1 回答 1

0

需要更多努力但会使实际处理更容易的事情是有一个预消费者,它只会等到有 3 条消息要消费,将它们打包在“元消息”中,然后发送那是一个“准备好处理”的话题。它必须像提到的@cricket_007 一样,它不应该提交,直到它实际消耗了 3 条消息并将它们生成到出站主题中。

这样一来,最终消费者就是一个极其简单的消费者。它只会从“准备处理”主题中消费,并且无论何时它收到一条消息,您都知道它将拥有您需要的 3 个事件。您只需处理它们,然后再等待一个小时,直到您可以再次轮询。无需与任何其他消费者协调。

于 2020-01-23T18:30:55.940 回答