0

I have 1000 queues with specific names. so I want to process these queues with one broker. is it possible?

the queue names is stored in mysql db so I should fetch theme and run the broker for each one. and of course it should run asynchronously and should be able to pass the queued item to a idle broker. is this possible? or I should make 1000 files with specific queue names as brokers?

Update: this is a picture of my queues. the queues should run in a parallel manner not a serial one. so the users are producer and the worker is consumer that runs the send_message() method;

enter image description here

4

1 回答 1

2

我可以向您展示如何使用enqueue库。我必须警告你,没有办法在一个进程中异步消费消息。尽管您可以运行一些服务于一组队列的进程。它们可以按队列重要性分组。

安装 AMQP 传输和消费库:

composer require enqueue/amqp-ext enqueue/enqueue

创建消费脚本。我假设您已经从数据库中获取了一组队列名称。它们存储在$queueNamesvar 中。该示例将相同的处理器绑定到所有队列,但您当然可以设置不同的处理器。

<?php

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;

// here's the list of queue names which you fetched from DB
$queueNames = ['foo_queue', 'bar_queue', 'baz_queue'];

$factory = new AmqpConnectionFactory('amqp://');

$context = $factory->createContext();

// create queues at RabbitMQ side, you can remove it if you do not need it
foreach ($queueNames as $queueName) {
    $queue = $context->createQueue($queueName);
    $queue->addFlag(AMQP_DURABLE);

    $context->declareQueue($queue);
}

$consumer = new QueueConsumer($context);

foreach ($queueNames as $queueName) {
    $consumer->bind($queueName, function(PsrMessage $psrMessage) use ($queueName) {
        echo 'Consume the message from queue: '.$queueName;

        // your processing logic.

        return PsrProcessor::ACK;
    });
}

$consumer->consume();

更多在文档

于 2017-06-20T13:40:21.093 回答