我的 RabbitMQ 消费者必须以固定大小的批次处理消息,预取计数为 15。为了匹配 AWS SES 电子邮件发送速率,这限制为 15 - 消费者进程必须并行发出 SES API 请求。处理部分最终批次的最佳方式是什么,即留下少于 15 条消息的批次。
任何新消息都会添加到消费者的批处理数组中。当达到批大小时,将处理该批并为该批中的所有消息发回确认。
最后一批中的“剩余消息”必须在 10 秒连接超时生效之前在我的场景中处理。有没有办法在回调函数上实现超时,这样,如果在一段时间内接收到的消息数量少于预取计数,则处理并确认消费者临时批处理数组中的剩余消息。值得一提的是,当消费者进程正在执行时,不会有新消息发布到队列中。提前致谢。
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS);
$channel = $connection->channel();
$channel->queue_declare('test_queue', true, false, false, false);
$channel->basic_qos(null, 15, null);
$callback = function($message){
Ack_Globals::$msg_batch[] = $message;
Ack_Globals::$msg_count++;
$time_diff = Ack_Globals::$cur_time-Ack_Globals::$lbatch_recieved;
if(sizeof(Ack_Globals::$msg_batch) >= 15){
$time_end = microtime(true);
Ack_Globals::$lbatch_recieved = $time_end;
Ack_Globals::$batch_count = Ack_Globals::$batch_count + 1;
//Calculate the average time to create this batch
Ack_Globals::$bgen_time_avg = ($time_end - Ack_Globals::$time_start)/Ack_Globals::$batch_count;
//Process this batch
/* Process */
//Acknowledge this batch
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag'], true);
echo "\nMessage Count: ".Ack_Globals::$msg_count;
echo "\nSize of Array: ".sizeof(Ack_Globals::$msg_batch);
echo "\nLast batch received: ".Ack_Globals::$lbatch_recieved;
echo "\nBatch ".Ack_Globals::$batch_count." processed.";
echo "\nAverage batch generation time: ". Ack_Globals::$bgen_time_avg;
//Clear the batch array
Ack_Globals::$msg_batch = array();
}else{}
};
if ((Ack_Globals::$batch_count === 0) && (Ack_Globals::$msg_count === 0)){
//initialise the timer
Ack_Globals::$time_start = microtime(true);
}
$channel->basic_consume('int_surveys', '', false, false, false, false, $callback);
while ($channel->is_consuming()){
Ack_Globals::$cur_time = AMQPChannel::$current_time;
$channel->wait(null, false, 10);
}
$channel->close();
$connection->close();