我有一个发布大量消息的队列(~10K)。连接到此队列的是多个消费者,使用该php-amqplib
库在 codeigniter 中使用以下代码
public function processQueue()
{
// Make connection
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// Make channel
$channel = $connection->channel();
// Declare queue
$channel->queue_declare(QUEUE_NAME, false, false, false, false);
// PHP callable
$callback = function ($msg) {
//DO MESSAGE PROCESSING HERE
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume(AGENTS_QUEUE_PROCESSING, '', false, true, false, false, $callback);
// While queue is empty, wait
while (count($channel->callbacks)) {
// Wait
$channel->wait();
}
// Close channel and connection
$channel->close();
$connection->close();
}
消息被填满并同时被多个这样的消费者消费。我观察到剩余一些 5-6k 消息(即在消耗大约 4-5k 条消息之后),队列突然变空,消费者空闲并等待更多消息。此外,此时 RabbitMQ 管理 Web 面板上的消息总数突然下降。
我试过用耐用参数制作队列,但问题似乎是一样的。可能是什么问题及其解决方案?