12

我有多个分布式竞争消费者,每个消费者都从同一个(事务性)队列中提取消息。我想将每个消费者实现为幂等接收器,因此即使重复到达,我也永远不会多次处理相同的消息(跨所有消费者)。我怎样才能与多个消费者一起实现这一目标?

我的第一个想法是在将它们放入队列之前以某种方式为每条消息生成一个连续的序列号,然后使用共享数据库表来协调消费者之间的工作。即消费者#1 处理msg#1,然后将一行写入数据库表,说'msg#1 已处理'(希望它在数据库中以确保持久性)。当消费者准备好处理消息时,它会查看队列中的下一个可用消息,查阅共享数据库表并确定这是否是按顺序排列的下一个消息。如果是这样,它会将其从队列中拉出。如果不是,它会忽略它。

这样,我只需要存储处理的最后一条消息(因为所有消息都有一个连续的序列号),我不需要使用缓冲区存储所有接收到的消息的 ID,并以协商的“窗口”大小,并且消息总是被串行处理(这是我想要的这个场景)。

只是好奇是否有更好的方法?我担心每当我需要处理消息时查询数据库的成本。

如果答案是“它取决于框架”,那么我想到了 MSMQ

4

3 回答 3

9

我通过确保每条消息都有一个 GUID 或其他唯一标识符,然后将其记录在与您更改持久性存储中的状态相同的事务中,从而实现了幂等消息。

对于每条消息,您现在可以检查唯一 ID 是否存在于您的持久性存储中。

If the unique id exists, you know it was processed previously and state changes were persisted in the same transaction.

If the unique id does not exist, you know that it has never been processed.

If two consumers process the same message, because your table where you store your processed unique id's has a unique constraint, when it comes time for both consumers to commit their transactions, one of them must fail and rollback all of it's changes while the other will succeed.

于 2018-05-11T21:55:44.470 回答
3

幂等接收者的要点是,如果消息被处理多次,则无关紧要。因此,幂等接收者不需要以某种方式检测消息是重复的,他们可以像往常一样简单地处理它......

所以要么你的接收者不是幂等的,要么你不必要地担心......

于 2009-11-03T18:24:59.093 回答
-2

安德鲁 -

另一种选择是查看您的队列如何处理消息。有队列在消息被消费者拾取后删除消息。这是队列的典型行为,不难找到具有这种功能的队列。这应该为您提供一个简单的解决方案,而不是为每个消费者构建一种方法来确保他们不会收到已经被另一个消费者处理过的消息。

最好的,亨利

于 2011-12-23T16:46:19.710 回答