1

我已经搜索了该信息(包括文档),但找不到。

我正在使用最新版本的php-amqplib和 RabbitMQ v. 2.7.1。我有三个队列和三个交换:

// Declare the exchanges
$this->channel->exchange_declare(self::EXCHANGE_TO_PROCESS, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_WAITING, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_TO_CLEAN, 'direct', false, true, false, false, false);

// Messages in the to_process queue are sent to to_clean after 24 hours without being processed
$this->channel->queue_declare(self::QUEUE_TO_PROCESS, false, true, false, false, false, array(
    'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_CLEAN),
    'x-message-ttl' => array('I', 86400000), // 1 day in milli-seconds
));

// Messages in the waiting queue are sent to to_process after 5 minutes (wait period before retry)
$this->channel->queue_declare(self::QUEUE_WAITING, false, true, false, false, false, array(
    'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_PROCESS),
    'x-message-ttl' => array('I', 300000), // 5 minutes in milli-seconds
));

// Messages in the to_clean queue are kept until they are processed
$this->channel->queue_declare(self::QUEUE_TO_CLEAN, false, true, false, false, false);

// Bind the queues to the exchanges
$this->channel->queue_bind(self::QUEUE_TO_PROCESS, self::EXCHANGE_TO_PROCESS);
$this->channel->queue_bind(self::QUEUE_TO_CLEAN, self::EXCHANGE_TO_CLEAN);
$this->channel->queue_bind(self::QUEUE_WAITING, self::EXCHANGE_WAITING);

行为非常简单:消息发布到EXCHANGE_TO_PROCESS. 外部工作人员处理消息:如果处理正常,则消息仅被确认,因此从队列中删除(这部分工作正常);如果处理出错,则将消息插入到EXCHANGE_WAITINGwhere,在 5 分钟的 TTL 后,通过 DLX 将其重新插入到EXCHANGE_TO_PROCESS重新处理中。但是,在第三次失败之后,它会被插入到EXCHANGE_TO_CLEANcron 作业将出现的地方并清理消息、日志错误等。

然而,我遇到的问题是代码清楚QUEUE_WAITING地将命令。5 分钟结束后,该消息随即消失。我不太清楚为什么。EXCHANGE_WAITINGQUEUE_TO_PROCESSQUEUE_WAITING

所有这一切都让我们想到了我的问题:死信交换是否将参数中的交换隐式绑定到队列?并且:我丢失的消息可能会发生什么?

编辑

我比以前更困惑。我尝试了以下非常基本的代码:

    $this->channel->exchange_declare('exchangeA', 'fanout', false, true, false, false, false);
    $this->channel->exchange_declare('exchangeB', 'fanout', false, true, false, false, false);
    $this->channel->queue_declare('queueA', false, true, false, false, false, array(
        'x-dead-letter-exchange' => array('S', 'exchangeB'),
        'x-message-ttl' => array('I', 5000)
    ));
    $this->channel->queue_declare('queueB', false, true, false, false, false);
    $this->channel->queue_bind('queueA', 'exchangeA');
    $this->channel->queue_bind('queueB', 'exchangeB');

    $msg = new AMQPMessage('hello!');
    $this->channel->basic_publish($msg, 'exchangeA');

这创建了两个队列和两个交换(我已经看到它们以fanout避免打扰路由键),将 queueA 绑定到 exchangeA 并将 queueB 绑定到 exchangeB,在 queueA 上设置一个 TTL 并将其 DLX 到 exchangeB。观察管理页面中发生的情况,我看到一条消息在 queueA 中花费了 5 秒,正如预期的那样,然后消息消失了,就像我上面更复杂的代码一样。

4

2 回答 2

3

看起来您的消息流可能会被循环。如果是这样,RabbitMQ 将按照官方文档(路由死信消息部分)中指定的方式静默丢弃消息:

有可能形成死信队列的循环。例如,当队列死信消息发送到默认交换时,可能会发生这种情况,而没有指定死信路由键。如果整个周期是由于消息到期而导致的,则此类周期中的消息(即两次到达同一队列的消息)将被丢弃。

要解决骑车问题,您必须选择一种选择:

  1. 完全打破循环(在某个时候丢弃消息)。
  2. 或者使用来自停滞队列的消息并根据您的工作流程手动重新发布它们。

这会给您的应用程序带来一些复杂性,但这是您必须为性能和稳定性付出的代价。

PS:

我挖掘了 RabbitMQ 邮件列表,发现了类似的问题,比如你的 - Dead Letter, TTL and Cycle

目前,您必须使用并重新发布以清除 RabbitMQ 之外的标头。

于 2014-06-05T15:28:03.740 回答
0

当我偶然发现这个博客并看到海报上的案例与我们的案例非常相似时,我怀疑是否有问题,他说它工作正常......所以我开始挖掘更多。

我们遇到的问题只是版本问题。有人告诉我 RabbitMQ 软件包是最新的,但我们使用的是 Ubuntu 12.04 LTS,所以“最新”版本是 2.7.1 - 一个超过 3 年的版本。

如果您的情况与我们相同(使用较旧的发行版),请查看 RabbitMQ 的下载页面并选择适合您发行版的那个。在Ubuntu的情况下,我们只是添加了官方 repo(您也可以简单地下载 .dpkg 文件),执行apt-get update并等待服务器重新启动。之后,上面的代码几乎可以正常工作。

于 2014-06-09T15:50:59.593 回答