6

I'm trying to use the Delayed Message Queue for RabbitMQ from PHP, but my messages are simply disappearing.

I'm declaring the exchange with the following code:

$this->channel->exchange_declare(
    'delay',
    'x-delayed-message',
    false,  /* passive, create if exchange doesn't exist */
    true,   /* durable, persist through server reboots */
    false,  /* autodelete */
    false,  /* internal */
    false,  /* nowait */
    ['x-delayed-type' => ['S', 'direct']]);

I'm binding the queue with this code:

$this->channel->queue_declare(
    $queueName,
    false,  /* Passive */
    true,   /* Durable */
    false,  /* Exclusive */
    false   /* Auto Delete */
);
$this->channel->queue_bind($queueName, "delay", $queueName);

And I'm publishing a message with this code:

$msg = new AMQPMessage(json_encode($msgData), [
    'delivery_mode' => 2,
    'x-delay' => 5000]);
$this->channel->basic_publish($msg, 'delay', $queueName);

But the message doesn't get delayed; it's still immediately delivered. What am I missing?

4

3 回答 3

5

这里开始

消息创建应该是

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$msg = new AMQPMessage($data,
            array(
                'delivery_mode' => 2, # make message persistent
                'application_headers' => new AMQPTable([
                    'x-delay' => 5000
                ])
            )
        );
于 2016-10-12T00:39:42.600 回答
4

答案适用于那些需要消息延迟但不想深入了解细节的人。你只需要几件事就可以让它工作:

安装amqp 互操作兼容传输,例如enqueue/amqp-bunnyenqueue/amqp-tools.

composer require enqueue/amqp-bunny enqueue/amqp-tools

创建 amqp 上下文,添加延迟策略并发送延迟消息:

<?php
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;

$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy())

$queue = $context->createQueue('foo');
$context->declareQueue($queue);

$message = $context->createMessage('Hello world!');

$context->createProducer()
    ->setDeliveryDelay(5000) // 5 sec
    ->send($queue, $message)
;

顺便说一句,这不是唯一可用的策略。有一个基于 RabbitMQ 死信队列 + ttl。它可以以同样的方式使用。

于 2017-08-07T14:16:50.583 回答
1

您需要一个路由密钥才能从交换机发布到相关队列。

发布到内置直接交换的原因是因为此交换是使用路由键作为目标队列名称的特殊情况。

对于您创建的所有交换和队列,您需要使用路由键在交换和队列之间创建绑定。然后您使用该路由键而不是目标队列名称发布消息。

我不知道创建绑定的 PHP 代码......但它通常看起来像这样:

channel.bind(exhange_name, queue_name, routing_key)

然后在您发布消息时:

$this->channel->basic_publish($msg, 'delay', $routing_key);

于 2015-08-31T19:04:14.400 回答