2

我试图弄清楚即使没有消费者运行,是否可以将消息存储在 RabbitMQ 交换中。

我理解(可能是错误的)要实现交换需要“持久”以及队列,并且需要使用“持久”标志发送消息

'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT

我的主要目标是将所有消息存储在交换器中,以便在无论出于何种原因没有消费者运行的情况下,当我启动一个交换器时,交换器中的所有消息都可以定向到绑定队列。我声明我的交换和队列如下:

//Sender.php
public function sendToQueue(ActionMessage $message)
    {
        $headers = [
            'content-type' => 'application/json',
            'timestamp' => $message->getCreatedAt()->getTimestamp(),
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ];
        $channel = $this->connection->getChannel();
        $channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
        $qMessage = new AMQPMessage(json_encode($message->toArray()), $headers);
        $channel->basic_publish($qMessage, $this->exchangeName, $message->getTopic());
        return true;
    }
//Receiver.php
public function consume($callbackFunction)
        {
            $channel = $this->messenger->getChannel();
            $channel->exchange_declare($this->exchange, 'direct', false, true, false);
            list($queueName, ,) = $channel->queue_declare('', false, true, true, false);
            $channel->queue_bind($queueName, $this->exchangeName, $this->topicAction);

            $channel->basic_consume($queueName, '', false, true, false, false, $callbackFunction);

            while (count($channel->callbacks)) {
                $channel->wait();
            }

            $channel->close();
            $this->messenger->close();
        }

我会很感激任何帮助(即使只是放弃这个想法并在两者之间插入一些存储)。谢谢。

4

1 回答 1

1

交换器不存储消息,这是队列的工作。您遇到的问题不是没有消费者在运行,而是没有队列存在,因为您让消费者声明自己的队列。

如果您希望消息在消费者拿起它们之前一直存在,您应该声明:

  • “发件人”将发布到的交易所
  • 附加到该交换的命名队列,用于将单独使用的每种类型的消息(如果使用direct交换,则每个路由键对应一个)

这些都可以在 Sender 脚本中声明,但在大多数情况下,在部署应用程序时声明它们一次更有意义,将它们视为数据库模式。

无需在 Receiver 脚本中创建匿名队列,您只需附加到指定队列,并开始接收在那里等待的消息。

这将产生的主要区别是同一路由键的多个消费者将如何交互:

  • 与现有代码一样,附加到单个交换的多个队列会为每条消息创建多个副本。如果您有不同的消费者使用相同的消息做不同的事情,这将很有用。
  • 正如我在上面所建议的,连接到单个队列的多个消费者将共享消息,每个消息基本上由不同的消费者随机处理。如果您有多个相同的消费者来处理大量消息,这很有用。

您可能会发现这个 RabbitMQ 模拟器对可视化差异很有用。

你可能会发现你实际上想要一个混合物:

  • 为每个必须查看每条消息的消费者预先声明一个队列,以确保存储每条消息的副本,直到该特定消费者准备好阅读它。
  • 在额外的消费者中声明额外的临时队列,以便在消息进入时获取额外的消息副本

最后一点,RabbitMQ 中有两种机制可以回退到对无法处理的消息进行不同的处理:

  • 备用交换捕获将从交换中丢弃的消息(因为没有适当的队列绑定)。
  • 死信交换捕获将从队列中丢弃的消息(例如,因为它被消费者拒绝,或达到配置的超时)。

如果您实际上不想正常处理丢失的消息,而只想检测它们,例如将它们列在错误日志中,则 AE 在您的示例中可能很有用。

于 2019-03-26T13:43:52.443 回答