3

我刚刚开始使用 php-amqplib 和 RabbitMQ,并且想要一种方法来处理无论出于何种原因无法处理且无法处理的消息。我认为人们处理这个问题的一种方法是使用死信队列。我正在尝试设置它,但到目前为止还没有任何运气,希望有人可以提供一些建议。

我的队列启动看起来有点像:

class BaseAbstract
{
    /** @var AMQPStreamConnection */
    protected $connection;
    /** @var AMQPChannel */
    protected $channel;
    /** @var array */
    protected $deadLetter = [
        'exchange' => 'dead_letter',
        'type' => 'direct',
        'queue' => 'delay_queue',
        'ttl' => 10000 // in milliseconds
    ];

    protected function initConnection(array $config)
    {
        try {
            $this->connection = AMQPStreamConnection::create_connection($config);
            $this->channel = $this->connection->channel();

            // Setup dead letter exchange and queue
            $this->channel->exchange_declare($this->deadLetter['exchange'], $this->deadLetter['type'], false, true, false);
            $this->channel->queue_declare($this->deadLetter['queue'], false, true, false, false, false, new AMQPTable([
                'x-dead-letter-exchange' => $this->deadLetter['exchange'],
                'x-dead-letter-routing-key' => $this->deadLetter['queue'],
                'x-message-ttl' => $this->deadLetter['ttl']
            ]));
            $this->channel->queue_bind($this->deadLetter['queue'], $this->deadLetter['exchange']);

            // Set up regular exchange and queue
            $this->channel->exchange_declare($this->getExchangeName(), $this->getExchangeType(), true, true, false);
            $this->channel->queue_declare($this->getQueueName(), true, true, false, false, new AMQPTable([
                'x-dead-letter-exchange' => $this->deadLetter['exchange'],
                'x-dead-letter-routing-key' => $this->deadLetter['queue']
            ]));

            if (method_exists($this, 'getRouteKey')) {
                $this->channel->queue_bind($this->getQueueName(), $this->getExchangeName(), $this->getRouteKey());
            } else {
                $this->channel->queue_bind($this->getQueueName(), $this->getExchangeName());
            }
        } catch (\Exception $e) {
            throw new \RuntimeException('Cannot connect to the RabbitMQ service: ' . $e->getMessage());
        }
        return $this;
    }

    // ...
}

我认为应该设置我的死信交换和队列,然后还设置我的常规交换和队列(使用扩展类提供的 getRouteKey、getQueueName 和 getExchangeName/Type 方法)

当我尝试处理如下消息时:

public function process(AMQPMessage $message)
{
    $msg = json_decode($message->body);
    if (empty($msg->payload) || empty($msg->payload->run)) {
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true);
        return;
    }

    // removed for post brevity, but compose $cmd variable

    exec($cmd, $output, $returned);
    if ($returned !== 0) {
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    } else {
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
    }
}

但我得到了错误Something went wrong: Cannot connect to the RabbitMQ service: PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'delay_queue' in vhost '/': received 'dead_letter' but current is ''

这是我应该设置死字的方式吗?我看到的不同示例似乎都显示了一些不同的处理方式,但似乎都不适合我。所以我显然在这里误解了一些东西,我很感激任何建议。:)

4

1 回答 1

3

设置(永久)队列和交换是你想要做的事情,在部署代码,而不是每次你想使用它们时。将它们想象成您的数据库模式 - 尽管协议提供“声明”而不是“创建”,但您通常应该编写假设事物以特定方式配置的代码。您可以将代码的第一部分构建到设置脚本中,或者使用基于 Web 和 CLI 的管理插件使用简单的 JSON 格式来管理这些。

您看到的错误可能是尝试在不同时间使用不同参数声明同一队列的结果 - “声明”不会替换或重新配置现有队列,它会将参数视为“前提条件”检查。您需要删除并重新创建队列,或通过管理 UI 对其进行管理,以更改其现有参数。

当您想在代理中动态创建项目时,运行时声明变得更加有用。您可以为他们提供您知道对于该目的唯一的名称,或者null作为名称传递以接收随机生成的名称(人们有时会提到创建一个“匿名队列”,但 RabbitMQ 中的每个队列都有一个名称,甚至如果你没有选择它)。


如果我没看错,你的“模式”看起来像这样:

# Dead Letter eXchange and Queue
Exchange: DLX
Queue: DLQ; dead letter exchange: DLX, with key "DLQ"; automatic expiry
Binding: copy messages arriving in DLX to DLQ

# Regular eXchange and Queue
Exchange: RX
Queue: RQ; dead letter exchange: DLX, with key "DLQ"
Binding: copy messages from RX to RQ, optionally filtered by routing key

当消息在 RQ 中“nacked”时,它将被传递到 DLX,其路由键被覆盖为“DLQ”。然后它将被复制到 DLQ。如果它从 DLQ 被 nack,或者在该队列中等待太久,它将被路由到它自己。

我会以两种方式简化:

  • 从“死信队列”(我将其标记为 DLQ)中删除死信交换和 TTL;该循环可能比有用更令人困惑。
  • x-dead-letter-routing-key从常规队列(我标记为 RQ)中删除该选项。常规队列的配置不需要知道死信交换是有零个、一个还是多个附加到它的队列,因此不应该知道其他队列的名称。如果您希望 nacked 消息直接进入一个队列,只需将其设置为“扇出交换”(忽略路由键)或绑定键设置为的“主题交换” #(这是一个匹配所有路由键的通配符)。

另一种方法可能是设置x-dead-letter-routing-key常规队列的名称,即标记它来自哪个队列。但在你有一个用例之前,我会保持简单,并使用其原始路由密钥留下消息。

于 2018-11-07T13:48:51.187 回答