考虑以下两种模式之一:“消息序列”或“重新排序器”。
消息序列的用途如下:
每当可能需要将大量数据分解为消息大小的块时,将数据作为消息序列发送,并用序列标识字段标记每条消息。
Resequencer有点不同:
使用有状态过滤器 Resequencer 来收集和重新排序消息,以便它们可以按指定顺序发布到输出通道。
Resequencer 可以接收可能未按顺序到达的消息流。Resequencer 包含在内部缓冲区中以存储乱序消息,直到获得完整的序列。然后将按顺序的消息发布到输出通道。输出通道保持顺序很重要,这样可以保证消息按顺序到达下一个组件。与大多数其他路由器一样,Resequencer 通常不会修改消息内容。
我强烈建议阅读 Gregor Hohpe/Bobby Woolf 所著的“企业集成模式”一书中的这些模式(来自 Martin Fowler 等人的贡献)
这里有比我能描述的更多的细节,但本质上消息序列取决于序列标识符、位置和“结束”(布尔字段)。您需要一个位于队列末尾的适配器来处理序列。
另一方面,定序器“将乱序消息存储在内部缓冲区中,直到获得完整的序列,然后以正确的序列将消息发布到输出通道”(第 285 页)。
这与您当前使用的“转储到数据库,然后有一个工人拉动”策略非常相似。
这些模式的实现细节将基于您的应用程序语言和队列选择(在您的情况下为 RabbitMQ),但这些模式已经非常完善,所以我会好好看看这些。
我不知道 RabbitMQ 本身有任何内置机制可以帮助您获得它。
希望这可以帮助。
编辑
我用谷歌搜索“RabbitMQ resequencer”并找到以下内容(虽然不能保证其功效):rabbus-sequence (GitHub)。
无论如何,可能有助于查看他们的代码在那里做什么,看看是否可以从中获得一些灵感。