8

假设您的系统中有一个实体,例如“Person”,并且您想要处理修改各种 Person 实体的事件。重要的是:

  • 同一个 Person 的事件按 FIFO 顺序处理
  • 多个 Person 事件流由不同的线程/进程并行处理

我们有一个使用共享数据库和锁来解决这个问题的实现。线程竞争获取一个 Person 的锁,然后在获取锁后按顺序处理事件。我们想移动到消息队列以避免轮询和锁定,我们认为这会减少数据库的负载并简化消费者代码的实现。

我已经对 ActiveMQ、RabbitMQ 和 HornetQ 进行了一些研究,但我没有看到明显的实现方法。

ActiveMQ 支持消费者订阅通配符,但我没有看到将每个队列上的并发限制为 1 的方法。如果我能做到这一点,那么解决方案将很简单:

  • 不知何故,告诉代理允许所有以 /queue/person 开头的队列的并发性为 1。
  • 发布者使用队列名称中的人员 ID 将事件写入队列。例如:/queue/person.20
  • 消费者使用通配符订阅队列:/queue/person。>
  • 每个消费者将收到不同人员队列的消息。如果所有人员队列都在使用中,一些消费者可能会闲置,这没关系
  • 处理完一条消息后,消费者发送一个 ACK​​,它告诉代理它已经处理完该消息,并允许将该 Person 队列的另一条消息发送给另一个消费者(可能是同一个)

ActiveMQ 接近了:您可以进行通配符订阅并启用“独占消费者”,但这种组合会导致单个消费者接收发送到所有匹配队列的所有消息,从而将所有人员的并发性降低到 1。我觉得我错过了一些明显的东西。

问题:

  • 有没有办法通过任何主要的消息队列实现来实现上述方法?我们对选择持相当开放的态度。唯一的要求是它在 Linux 上运行。
  • 有没有不同的方法来解决我没有考虑的一般问题?

谢谢!

4

3 回答 3

4

看起来 JMSXGroupID 是我正在寻找的。来自 ActiveMQ 文档:

http://activemq.apache.org/message-groups.html

他们的股票价格示例用例正是我所追求的。我唯一担心的是如果单个消费者死亡会发生什么。希望代理能够检测到这一点并选择另一个消费者与该组 ID 相关联。

于 2011-04-15T19:58:40.503 回答
1

解决此问题的一种通用方法(如果我的问题正确的话)是为 Person 引入一些唯一属性(例如,Person 的数据库级 id)并使用该属性的哈希作为 FIFO 队列的索引以将该 Person 放入。
由于该属性的散列可能很大(您负担不起 2^32 个队列/线程),因此仅使用该散列的 N 个最低有效位。每个 FIFO 队列都应该有专门的工作人员来处理它——瞧,你的要求得到了满足!

这种方法有一个缺点——您的 Persons 必须具有分布良好的 id 才能使所有队列以或多或少相等的负载工作。如果您不能保证,请考虑使用轮询队列集并跟踪现在正在处理哪些人员,以确保对同一人员进行顺序处理。

于 2011-04-15T19:32:15.813 回答
0

如果您已经有一个允许共享锁的系统,为什么不为每个队列设置一个锁,消费者在从队列中读取之前必须获取这些锁?

于 2011-04-15T17:58:08.400 回答