0

我的 RabbitMQ 消费者必须以固定大小的批次处理消息,预取计数为 15。为了匹配 AWS SES 电子邮件发送速率,这限制为 15 - 消费者进程必须并行发出 SES API 请求。处理部分最终批次的最佳方式是什么,即留下少于 15 条消息的批次。

任何新消息都会添加到消费者的批处理数组中。当达到批大小时,将处理该批并为该批中的所有消息发回确认。

最后一批中的“剩余消息”必须在 10 秒连接超时生效之前在我的场景中处理。有没有办法在回调函数上实现超时,这样,如果在一段时间内接收到的消息数量少于预取计数,则处理并确认消费者临时批处理数组中的剩余消息。值得一提的是,当消费者进程正在执行时,不会有新消息发布到队列中。提前致谢。

    $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS);
    $channel = $connection->channel();
    $channel->queue_declare('test_queue', true, false, false, false);
    $channel->basic_qos(null, 15, null);

    $callback = function($message){
        Ack_Globals::$msg_batch[] = $message;
        Ack_Globals::$msg_count++;
        $time_diff = Ack_Globals::$cur_time-Ack_Globals::$lbatch_recieved;
        if(sizeof(Ack_Globals::$msg_batch) >= 15){
            $time_end = microtime(true);
            Ack_Globals::$lbatch_recieved = $time_end;
            Ack_Globals::$batch_count = Ack_Globals::$batch_count + 1;

            //Calculate the average time to create this batch
            Ack_Globals::$bgen_time_avg = ($time_end - Ack_Globals::$time_start)/Ack_Globals::$batch_count;

            //Process this batch
            /* Process */

            //Acknowledge this batch
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag'], true);

            echo "\nMessage Count: ".Ack_Globals::$msg_count;
            echo "\nSize of Array: ".sizeof(Ack_Globals::$msg_batch);
            echo "\nLast batch received: ".Ack_Globals::$lbatch_recieved;
            echo "\nBatch ".Ack_Globals::$batch_count." processed.";
            echo "\nAverage batch generation time: ". Ack_Globals::$bgen_time_avg;
            
            //Clear the batch array
            Ack_Globals::$msg_batch = array();
        }else{}
    };


    if ((Ack_Globals::$batch_count === 0) && (Ack_Globals::$msg_count === 0)){
        //initialise the timer
        Ack_Globals::$time_start = microtime(true);
    }

    $channel->basic_consume('int_surveys', '', false, false, false, false, $callback);

    while ($channel->is_consuming()){

        Ack_Globals::$cur_time = AMQPChannel::$current_time;
        $channel->wait(null, false, 10);
    }
    $channel->close();
    $connection->close();
4

0 回答 0