0

我有一个发布大量消息的队列(~10K)。连接到此队列的是多个消费者,使用该php-amqplib库在 codeigniter 中使用以下代码

public function processQueue()
{
    // Make connection
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    // Make channel
    $channel = $connection->channel();
    // Declare queue
    $channel->queue_declare(QUEUE_NAME, false, false, false, false);

    // PHP callable
    $callback = function ($msg) {

        //DO MESSAGE PROCESSING HERE

        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

    };

    $channel->basic_consume(AGENTS_QUEUE_PROCESSING, '', false, true, false, false, $callback);

    // While queue is empty, wait
    while (count($channel->callbacks)) {
        // Wait
        $channel->wait();
    }

    // Close channel and connection
    $channel->close();
    $connection->close();
}

消息被填满并同时被多个这样的消费者消费。我观察到剩余一些 5-6k 消息(即在消耗大约 4-5k 条消息之后),队列突然变空,消费者空闲并等待更多消息。此外,此时 RabbitMQ 管理 Web 面板上的消息总数突然下降。

我试过用耐用参数制作队列,但问题似乎是一样的。可能是什么问题及其解决方案?

4

0 回答 0