我已经搜索了该信息(包括文档),但找不到。
我正在使用最新版本的php-amqplib和 RabbitMQ v. 2.7.1。我有三个队列和三个交换:
// Declare the exchanges
$this->channel->exchange_declare(self::EXCHANGE_TO_PROCESS, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_WAITING, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_TO_CLEAN, 'direct', false, true, false, false, false);
// Messages in the to_process queue are sent to to_clean after 24 hours without being processed
$this->channel->queue_declare(self::QUEUE_TO_PROCESS, false, true, false, false, false, array(
'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_CLEAN),
'x-message-ttl' => array('I', 86400000), // 1 day in milli-seconds
));
// Messages in the waiting queue are sent to to_process after 5 minutes (wait period before retry)
$this->channel->queue_declare(self::QUEUE_WAITING, false, true, false, false, false, array(
'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_PROCESS),
'x-message-ttl' => array('I', 300000), // 5 minutes in milli-seconds
));
// Messages in the to_clean queue are kept until they are processed
$this->channel->queue_declare(self::QUEUE_TO_CLEAN, false, true, false, false, false);
// Bind the queues to the exchanges
$this->channel->queue_bind(self::QUEUE_TO_PROCESS, self::EXCHANGE_TO_PROCESS);
$this->channel->queue_bind(self::QUEUE_TO_CLEAN, self::EXCHANGE_TO_CLEAN);
$this->channel->queue_bind(self::QUEUE_WAITING, self::EXCHANGE_WAITING);
行为非常简单:消息发布到EXCHANGE_TO_PROCESS
. 外部工作人员处理消息:如果处理正常,则消息仅被确认,因此从队列中删除(这部分工作正常);如果处理出错,则将消息插入到EXCHANGE_WAITING
where,在 5 分钟的 TTL 后,通过 DLX 将其重新插入到EXCHANGE_TO_PROCESS
重新处理中。但是,在第三次失败之后,它会被插入到EXCHANGE_TO_CLEAN
cron 作业将出现的地方并清理消息、日志错误等。
然而,我遇到的问题是代码清楚QUEUE_WAITING
地将命令。5 分钟结束后,该消息随即消失。我不太清楚为什么。EXCHANGE_WAITING
QUEUE_TO_PROCESS
QUEUE_WAITING
所有这一切都让我们想到了我的问题:死信交换是否将参数中的交换隐式绑定到队列?并且:我丢失的消息可能会发生什么?
编辑
我比以前更困惑。我尝试了以下非常基本的代码:
$this->channel->exchange_declare('exchangeA', 'fanout', false, true, false, false, false);
$this->channel->exchange_declare('exchangeB', 'fanout', false, true, false, false, false);
$this->channel->queue_declare('queueA', false, true, false, false, false, array(
'x-dead-letter-exchange' => array('S', 'exchangeB'),
'x-message-ttl' => array('I', 5000)
));
$this->channel->queue_declare('queueB', false, true, false, false, false);
$this->channel->queue_bind('queueA', 'exchangeA');
$this->channel->queue_bind('queueB', 'exchangeB');
$msg = new AMQPMessage('hello!');
$this->channel->basic_publish($msg, 'exchangeA');
这创建了两个队列和两个交换(我已经看到它们以fanout
避免打扰路由键),将 queueA 绑定到 exchangeA 并将 queueB 绑定到 exchangeB,在 queueA 上设置一个 TTL 并将其 DLX 到 exchangeB。观察管理页面中发生的情况,我看到一条消息在 queueA 中花费了 5 秒,正如预期的那样,然后消息消失了,就像我上面更复杂的代码一样。