3

如果处理结果不适合我,我如何将消息返回到队列。仅找到有关消息确认的信息,但我认为它不适合我。我需要,如果作为处理的结果,我将参数 RETRY 消息添加回队列。然后这个工人或另一个工人再次拿起它并尝试处理它。

例如:

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($msg) {
    $condition = json_decode($msg->body);

    if (!$condition) {
        # return to the queue
    }
};

$channel->basic_consume('test', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>
4

2 回答 2

5

将自动no_ack标志设置为 false

queue:从哪里获取消息的队列
consumer_tag:消费者标识符
no_local:不接收此消费者发布的消息。
no_ack:告诉服务器消费者是否会确认消息。
Exclusive:请求独占消费者访问,意味着只有这个消费者可以访问队列
nowait:
回调:一个PHP回调

$channel->basic_consume('test', '', false, false, false, false, $callback);

您必须使用确认,如果您的过程不起作用,您可以忽略 ack

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($message) {
    $condition = json_decode($message->body);
     
    if (!$condition) {
        // return to the queue 
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
    }else{
        // send ack , remove from queue
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    }
};

$channel->basic_consume('test', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

当然使用这种方法你会遇到消息总是在队列的头部,还有另一种可能,如果你真的想有一个重试的轨迹你可以按照下面的方法

定义一个重试队列,最好是您的队列名称-retry,并最好定义一个死信队列:-dlq

然后您可以执行以下操作: 如何设置-retry队列:这是其中最重要的部分。您需要声明具有以下功能的队列:

x-dead-letter-exchange:应与您的主队列路由键相同
x-dead-letter-routing-key:应与您的主队列路由键相同
x-message-ttl:重试之间的延迟

这些代码是 sudo 代码,请不要复制粘贴,这只是一个提示,给你一个关于它的想法

$maximumRetry = 5;
$callback = function($message) {
    $body = json_decode($message->body);
    try { 
        // process result is your condition
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    } catch(Exception $e) {
        // return to the queue 
        $body['try_attempt'] = !empty($body['try_attempt'])? int($body['try_attempt']) + 1: 1 
        if ($body['try_attempt'] >= $maximumRetry ){
            $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
            return
        }
        $msg = new AMQPMessage(json_encode($message));

        $channel->basic_publish($msg, '', 'test-retry');
    }
};

我们需要 3 个队列来重新绑定。

  • queue.example

    • 绑定:
      • 交换:queue.exchange
      • 路由:queue.example
    • 特征:
      • x-dead-letter-exchange: queue.exchange
      • x-dead-letter-routing-key: queue.example-dlq
  • queue.example-dlq

    • 绑定:
      • 交换:queue.exchange
      • 路由:queue.example-dlq
  • queue.example-重试

    • 绑定:
      • 交换:queue.exchange
      • 路由:queue.example-retry
    • 特征:
      • x-dead-letter-exchange: queue.exchange
      • x-dead-letter-routing-key: queue.example-added
      • x-消息-ttl:10000
于 2017-09-18T13:11:33.860 回答
0

结果证明这个解决方案比我想象的要容易,事实证明这个任务并不是专门针对 RabbitMQ,而是关于变量的范围。如果有人对解决方案感兴趣,请点击此处:

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$connection = new AMQPStreamConnection($AMQP);
$channel = $connection->channel();

$channel->queue_declare('test', false, false, false, false);

$callback = function($msg) {
  global $channel;

  $condition = json_decode($msg->body);

  if (!$condition) {
    $msg = new AMQPMessage(json_encode(array(
      'condition' => false
    )));

    $channel->basic_publish($msg, '', 'test');
  }
};

$channel->basic_consume('test', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
  $channel->wait();
}

$channel->close();
$connection->close();
?>
于 2017-09-18T01:49:01.700 回答