2

我正在使用https://github.com/videlalvaro/php-amqplib做一些 rabbitmq 工作:

我正在尝试创建一个阻塞版本的basic_get,(或者一个我可以重复调用并且每次只能得到一个消息的basic_consume版本),它将阻塞直到一条消息准备好然后返回它而不是返回null,如果没有在队列中。

当我尝试使用 basic_consume 获取一条消息时,事情会变得混乱,我最终会收到一堆“未准备好”但未确认的消息。(如果我以这种方式只获取一条消息,则每次都有效,如果我尝试获取 2 条消息,它有时会挂断并可以正常工作)

class Foo {
    ...
    private function blockingGet() {
            /*
                queue: Queue from where to get the messages
                consumer_tag: Consumer identifier
                no_local: Don't receive messages published by this consumer.
                no_ack: Tells the server if the consumer will acknowledge the messages.
                exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
                nowait:
                callback: A PHP Callback
             */
            $this->ch->basic_consume($this->queueName, "consumer_".$this->consumerNum++, false, false, false, false, function($msg) {
                    $this->msgCache = json_decode($msg->body);
                    $this->ch->basic_ack($msg->delivery_info['delivery_tag']);
                    $this->ch->basic_cancel($msg->delivery_info['consumer_tag']);
            });
            while (count($this->ch->callbacks)) {
                    $this->ch->wait();
            }
            return $this->msgCache;
    }
}

$q = new Foo();
for ($i = 0; $i < 5; $i++) {
    print $q->blockingGet();
}
4

1 回答 1

0

我通过将接收到的消息保存在传递给回调参数的闭包中,然后在调用$channel->basic_consume()后处理它,实现了类似于您在此处之后的操作,如果接收到消息(或者如果设置了 timeout 参数并达到了超时时间)。尝试以下方法:$channel->wait()wait()

class Foo {
    // ...
    public function __construct() {
        $this->ch->basic_consume($this->queueName, "", false, false, false, false, function($msg) {
                $this->msgCache = json_decode($msg->body);
                $this->ch->basic_ack($msg->delivery_info['delivery_tag']);
        });
    }
    // ...
    private function blockingGet() {
        $this->ch->wait();
        if ($this->msgCache) {
            $msgCache = $this->msgCache;
            $this->msgCache = null;
            return $msgCache;
        }
        return null;
    }
}

$q = new Foo();
for ($i = 0; $i < 5; $i++) {
    print $q->blockingGet();
}
于 2017-04-11T08:32:48.033 回答