我是 RabbitMQ 的新手。我正在使用带有 codeigniter 的 php-amqplib 库,并且仍然想知道我缺乏的一些知识。
- 为什么
$channel->wait()
使用? - 为什么它总是驻留在一个无限的while循环中?
- 如何/我可以绕过 Infinite while 循环。
就像在我的项目的一个用户想要向 100k 潜在客户广播新活动的情况下,如果第二个用户有大约 100 封邮件要发送,第二个用户就会受到影响,第二个用户必须等待 100k 邮件先送达,然后是最后一个用户轮到他了。
我需要一个并发消费者的解决方案,他们工作顺利而不影响其他人
这是我的代码片段:
public function campaign2(){
$this->load->library('mylibrary');
for( $i=1;$i<=5;$i++ ) {
$url = "http://localhost/myproject/rabbit/waiting";
$param = array('index' => $i);
$this->waiting($i);
}
}
public function waiting($i)
{
ini_set('memory_limit','400M');
ini_set('max_execution_time', 0);
ini_set('display_errors', 1);
${'conn_'.$i} = connectRabbit();
${'channel_'.$i} = ${'conn_'.$i}->channel();
${'channel_'.$i}->exchange_declare('ha-local-campaign-'.$i.'-exchange', 'fanout', false, true, false);
$q = populateQueueName('campaign-'.$i);
${'channel_'.$i}->queue_declare($q, false, true, false, false);
${'channel_'.$i}->queue_bind($q, 'ha-local-campaign-'.$i.'-exchange', 'priority.'.$i);
$consumer_tag = 'campaign_consumer' ;
function process_message($msg) {
echo 'Mail Sent';
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
function shutdown($channel, $conn){
echo '['.date('H:i:s').'] Campaign consumer - Shutdown!!';
}
${'channel_'.$i}->basic_consume($q, $consumer_tag, false, false, true, false,'process_message');
while(1) {
${'channel_'.$i}->wait();
}
register_shutdown_function('shutdown', ${'channel_'.$i}, ${'conn_'.$i});
}
如果有人能指导我完成整个过程,我将不胜感激。