4

我有一个抽象 RabbitMQ 服务器的 Web 服务接口(不要问我为什么,我知道这是不必要的步骤,但我必须这样做)。也就是说,我通过 Web 服务调用从队列中轮询消息,而不是直接通过amqp.

通过basic.consumer阻塞执行线程直到队列中有消息。这使得 Web 服务不会返回。

说明代码:

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $channel->queue_declare(QUEUE_NAME, false, true, false, false);
    $ret = array('body' => '');

    $callback = function($msg) use ($channel, &$ret) {
        $ret['body'] = $msg->body;
        /*
        Here I would basic.cancel the consumer if there were no messages in the queue
        */
    };

    $channel->basic_consume(QUEUE_NAME, 'tag', false, true, false, false, $callback);

    if (count($channel->callbacks)) {
        $channel->wait(); // blocks here...
    }

    return $ret;
4

2 回答 2

6

如果要获取队列的大小,可以queue_declare用php-amqlib调用,return的第二个参数是队列中的消息数。

  list($queue, $messageCount, $consumerCount) = $channel->queue_declare(QUEUE_NAME, true);

调用 queue_declare() 方法时,将 $passive 参数设置为 true 很重要

于 2017-09-06T15:59:02.010 回答
3

我想做的是通过basic.get.

在 php-amqlib 中:

$channel->basic_get(QUEUE_NAME, true); // the second arg is no_ack.

第二个参数标志着该消息不需要确认。也就是说,您不必将消息“标记”为已读,以便 RabbitMQ 自信地将其出列。排除它(设置它 = false)不会弹出顶部消息。

为什么要这么麻烦?

我将 RabbitMQ 代码包装在 http web 服务中。这不是一个好主意(至少对于我的用例而言)。当 web 服务返回时,rabbitmq 连接终止,未(尚未)确认的消息被重新排队返回队列。因此,如果您必须采用 http 包装器,请确保将 rabbitmq 连接的生命周期与 http 请求的生存时间隔离开来。但是,我没有尝试这个。

于 2015-09-08T22:51:14.703 回答