5

我是 RabbitMQ 的新手。我正在使用带有 codeigniter 的 php-amqplib 库,并且仍然想知道我缺乏的一些知识。

  • 为什么$channel->wait()使用?
  • 为什么它总是驻留在一个无限的while循环中?
  • 如何/我可以绕过 Infinite while 循环。

就像在我的项目的一个用户想要向 100k 潜在客户广播新活动的情况下,如果第二个用户有大约 100 封邮件要发送,第二个用户就会受到影响,第二个用户必须等待 100k 邮件先送达,然后是最后一个用户轮到他了。

我需要一个并发消费者的解决方案,他们工作顺利而不影响其他人

这是我的代码片段:

public function campaign2(){
        $this->load->library('mylibrary');
        for( $i=1;$i<=5;$i++ ) {
            $url = "http://localhost/myproject/rabbit/waiting";
            $param = array('index' => $i);
            $this->waiting($i);
        }          
}

public function waiting($i)
    {
        ini_set('memory_limit','400M');
        ini_set('max_execution_time', 0);
        ini_set('display_errors', 1);

        ${'conn_'.$i} = connectRabbit();
        ${'channel_'.$i} = ${'conn_'.$i}->channel();
        ${'channel_'.$i}->exchange_declare('ha-local-campaign-'.$i.'-exchange', 'fanout', false, true, false);
        $q    = populateQueueName('campaign-'.$i);
        ${'channel_'.$i}->queue_declare($q, false, true, false, false); 
        ${'channel_'.$i}->queue_bind($q, 'ha-local-campaign-'.$i.'-exchange', 'priority.'.$i);
        $consumer_tag = 'campaign_consumer' ;
        function process_message($msg) {
            echo 'Mail Sent';
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        }
        function shutdown($channel, $conn){
            echo '['.date('H:i:s').'] Campaign consumer - Shutdown!!'; 
        }

        ${'channel_'.$i}->basic_consume($q, $consumer_tag, false, false, true, false,'process_message');
        while(1) {
           ${'channel_'.$i}->wait();
        }
        register_shutdown_function('shutdown', ${'channel_'.$i}, ${'conn_'.$i});  
    }

如果有人能指导我完成整个过程,我将不胜感激。

4

1 回答 1

4

当你打电话时,$channel->wait()你是:

  • 检查通道的队列以查看是否有待处理的消息。

  • 对于每条消息,您将为相应通道的回调调用已注册的回调

从“你好世界的例子”,一步一步::

// First, you define `$callback` as a function receiving
// one parameter (the _message_).
$callback = function($msg) {
  echo " [x] Received ", $msg->body, "\n";
};

// Then, you assign `$callback` the the "hello" queue.
$channel->basic_consume('hello', '', false, true, false, false, $callback);

// Finally: While I have any callbacks defined for the channel, 
while(count($channel->callbacks)) {
    // inspect the queue and call the corresponding callbacks
    //passing the message as a parameter
    $channel->wait();
}
// This is an infinite loop: if there are any callbacks,
// it'll run forever unless you interrupt script's execution.

让您的第二个用户发送使用不同的队列。您可以拥有任意数量的队列。

于 2017-01-20T16:58:43.187 回答