1

假设您有多个生产者和一个消费者想要接收来自所有可用发布者的持久消息。

生产者以不同的速度工作。假设系统 A 产生 10 个请求/秒,系统 B 产生 1 个请求/秒。因此,如果您使用唯一的队列,您将处理来自 A 的 10 条消息,然后处理来自 B 的 1 条消息。

但是,如果您想平衡负载并处理来自 A 的一条消息,然后处理来自 B 的一条消息等呢?从多个队列消费不是一个好的选择,因为在这种情况下我们不能使用通配符绑定。

更新:
每个生产者排队似乎是最好的方法。生产者不知道他们不断变化的速度。每个消费者拥有一个队列,我可以订阅一个主题并接收来自所有可用发布者的消息。但是每个生产者都有一个队列,我需要自己编写逻辑:

  1. 通过管理插件获取所有可用队列(AMQP 不允许列出队列)。
  2. 按队列名称过滤。
  3. 实施循环策略。
  4. 实施通知机制以订阅随时可能出现的新发布者。
  5. 当发布者消失并且客户端读取所有消息时删除不必要的队列。

好吧,这看起来很简单,但我认为代理可以提供所有这些功能而无需任何编码。如果有一个队列,我只创建一个持久队列,将其绑定到主题交换,然后启动任意数量的发布者向主题发送消息。这个选项几乎是开箱即用的。

4

2 回答 2

1

我知道我参加聚会要迟到了,但还是。

在 Azure 服务总线术语中,它被称为“分区”,它基于分区键。最好的部分是在 Azure SB 中,接收客户端不知道分区,它只是订阅单个队列。

在 RabbitMQ 中有一个 X-Consistent-Hashing 插件(“ rabbitmq_consistent_hash_exchange ”),但不幸的是它不是那么方便。消费者必须明确配置为从特定队列消费。如果您有十个队列,那么您需要设置您的消费者,以便覆盖所有十个。

另外两个选项:

  1. 随机交换类型
  2. 分片插件

请记住,使用分片插件,即使它创建“一个要消费的逻辑队列”,您也必须拥有与虚拟队列一样多的订阅者,否则某些队列将未被使用。

于 2021-01-28T02:00:55.117 回答
0

您可以使用优先队列支持并根据生产者速度关联优先级。需要注意的是必须谨慎设置优先级(例如,如果消费者速度低于系统 B,则消费者只会消费来自 B 的消息)并且生产者必须知道他们的生产速度。

另一个需要考虑的选项是根据生产速度创建 3 种类型的队列:HIGHMEDIUMLOW。三个队列根据生产速度通过设置的绑定键绑定到交换机。可以使用它来完成。

消费者将使用循环策略从这 3 个队列中消费消息。需要注意的是,生产者必须了解他们的生产速度。

但是最好的选择可能是每个生产者排队,特别是如果生产者的速度不稳定并且无法分类。因此,生产者不需要知道他们的生产速度。

于 2015-04-23T18:20:57.177 回答