1

我有一个如下定义的函数:

public function subscribe($someQueue)
{
    $callback = function($msg){
        return $msg->body;
    };
    $this->channel->basic_consume( $someQueue, '', FALSE, TRUE, FALSE, FALSE, $callback);
    while(count($this->channel->callbacks)) {
         $this->channel->wait();
    }
}

我正在使用以下功能:

注意:以下行位于不同的类文件中,因此创建了包含上述函数的类的对象。

$objRMQ = new RabbitMQ();
$msgBody = $objRMQ->subscribe("someQueue");
echo "message body returned from someMethod: ".$msgBody; 

基本上,我想将每条消息的正文返回给发布到队列的调用者函数。

电流输出:

message body returned from subscribe: NULL

预期输出:

holla, this is your message from queue
4

1 回答 1

0

由于这个问题很老但仍然没有答案,我将给出一个简短的解释。您现在可能已经想出了答案,但这可能会帮助其他人在未来进行搜索。

这里的关键概念是“异步执行”。

当您使用该方法订阅频道时basic_consume,您并不是要求立即执行一次回调,而是要求在消息可用时执行它,然后每次有另一条消息可用时执行。

wait()对于 AMQPLib,您通过重复调用该方法来等待新消息;即在这里:

while(count($this->channel->callbacks)) {
     $this->channel->wait();
}

仔细想想,你的代码有两个错误:

  • 线路无处return $msg->body归。调用将发生在方法实现的某个深处,并且您没有从 得到任何输出,因此无法对该返回值做任何事情。wait()$this->channel->wait()
  • 另一方面,当您$objRMQ->subscribe("someQueue")从其他类调用时,您期望它返回一些东西,但该函数没有return声明。唯一的return语句在您传递给的匿名函数中basic_consume

解决方案基本上是在回调中完成对消息的所有echo $msg->body处理,或者您想要执行的任何实际处理。如果您真的想在消息进入时收集数据,您可以将其保存到回调外部可访问的某个变量中,但请记住,您有时需要跳出循环才能对这些数据执行任何操作。wait()

于 2016-02-16T19:03:04.603 回答