0

我有一个用例,我必须使用一种排队机制来确保消息由消费者(“工人”)按顺序处理,一个接一个。

我过去使用过 RabbitMQ,它保证了接收消息的顺序。但是,如果该顺序不正确怎么办?

假设我提交消息 [4, 5, 3, 2, 1],RabbitMQ 消费者将按该顺序处理消息。如果我希望它们按 [1, 2, 3, 4, 5] 的顺序处理,因为消息相互依赖,该怎么办?

另外,我不想让消费者在消息 2 被确认之前消费消息 3(没有间隙)。

是否有任何支持此用例的排队解决方案?目前,我们将消息转储到数据库中,并让工作人员按顺序定期提取数据。

4

1 回答 1

1

考虑以下两种模式之一:“消息序列”或“重新排序器”。

消息序列的用途如下:

每当可能需要将大量数据分解为消息大小的块时,将数据作为消息序列发送,并用序列标识字段标记每条消息。

Resequencer有点不同:

使用有状态过滤器 Resequencer 来收集和重新排序消息,以便它们可以按指定顺序发布到输出通道。

Resequencer 可以接收可能未按顺序到达的消息流。Resequencer 包含在内部缓冲区中以存储乱序消息,直到获得完整的序列。然后将按顺序的消息发布到输出通道。输出通道保持顺序很重要,这样可以保证消息按顺序到达下一个组件。与大多数其他路由器一样,Resequencer 通常不会修改消息内容。

强烈建议阅读 Gregor Hohpe/Bobby Woolf 所著的“企业集成模式”一书中的这些模式(来自 Martin Fowler 等人的贡献)

这里有比我能描述的更多的细节,但本质上消息序列取决于序列标识符、位置和“结束”(布尔字段)。您需要一个位于队列末尾的适配器来处理序列。

另一方面,定序器“将乱序消息存储在内部缓冲区中,直到获得完整的序列,然后以正确的序列将消息发布到输出通道”(第 285 页)。

这与您当前使用的“转储到数据库,然后有一个工人拉动”策略非常相似。

这些模式的实现细节将基于您的应用程序语言和队列选择(在您的情况下为 RabbitMQ),但这些模式已经非常完善,所以我会好好看看这些。

我不知道 RabbitMQ 本身有任何内置机制可以帮助您获得它。

希望这可以帮助。

编辑

我用谷歌搜索“RabbitMQ resequencer”并找到以下内容(虽然不能保证其功效):rabbus-sequence (GitHub)

无论如何,可能有助于查看他们的代码在那里做什么,看看是否可以从中获得一些灵感。

于 2017-10-26T22:25:01.653 回答