3

这里有一点关于我目前的设置。

  • REST API 将 (POST) 数据推送到队列中
  • 队列有一个始终在运行的消费者,并生产到 en Exchange
  • Exchange 路由到其他几个队列(如 20+)
  • 每个(20+)队列都执行一项特定任务(消费者也总是运行)
  • Cron 作业运行以检查所有 (20+) 任务是否已完成并生成到另一个队列

我不确定我是否喜欢消费者一直运行,因为每个消费者使用大约 300MB 的 Ram(我认为它是 MB,目前不在我面前),我正在寻找另一种实现方式。

    M <-- Message coming from REST API
    |
    |
    +-First Queue
    |
    |
    | <-- The Exchange
   /|\
  / | \
 /  |  \ <-- bind to multiple queues ( 20+ )
Q1  Q2  Q3 <-- Each Queue is a task that must be completed


    | <-- CRON runs to check if all queues above have completed
    |
    |
    Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
    |
    C <-- Consumer 

我在下面的相关问题中建议使用 RPC,但这个问题是 RPC(据我了解)将有多个实例。这是一个资源密集型过程,我认为通过添加 RPC 调用它只会让服务器陷入困境,然后变得无响应(如果我错了,请纠正我)。

另一种方法是使用聚合器模式

这看起来正是我需要的,但我发现文档有限。有人做过这种模式吗?

我的问题是我对它目前的实施方式不满意,我正在寻找改进流程的方法。我希望摆脱 CRON,实施新模式,而不是让消费者一直运行。

该流程目前也只支持每个消费者的单个实例。它可以有多个消费者,但我们如何实现它,我们当时只想要一个。

这是使用 RabbitMQBundle 在 PHP、Symfony2 框架中实现的

相关问题:

4

1 回答 1

5

OldSound 这里是 RabbitMQ Bundle 的创建者。

捆绑包本身不支持开箱即用的聚合器模式,但您可以使用底层 php-amqplib 来实现它。

要执行聚合,您需要沿处理链发布具有相关 ID 和该 ID 的线程的消息。然后聚合器将等待 X 条消息,具体取决于您必须处理该特定任务的不同工作人员的数量。等待消息的一种方法是拥有一个数组,如果它们被相关 id 索引,则保留它们。

因此,每当您收到传入消息时,您将执行以下操作:

$correlation_id = $msg->get('correlation_id');
$this->receivedMessages[$correlation_id]['msgs'][] = $msg;

然后在你做的某个地方:

if ($someNumber == count($this->receivedMessages[$correlation_id]['msgs']) {
// proceed to next step
}

我现在实际上正在为 Symfony 开发一个 Workflow 包,我计划很快将其开源。该捆绑包可用于以非常简单的方式实现您提出的用例(即您只需要为每个任务提供服务)。

现在我想知道为什么每个消费者要占用 300 MB 的 RAM?您是否需要与它们一起运行完整的堆栈框架?如果可能的话,为消费者应用程序创建一个新的 Symfony 内核,并且只加载你需要的内容以减少开销。

于 2013-01-30T19:32:35.063 回答