7

我正在尝试读取未使用 Symfony Messenger 发送的排队消息(在 RabbitMQ 中)。Messenger似乎添加了一些标题,例如

headers: 
    type: App\Message\Transaction

但是在读取外部消息时,此标头不存在。

那么,有没有办法告诉 Messenger 队列 A 中的每条消息都必须被视为消息类型Transaction

我今天拥有的是:

framework:
    messenger:
        transports:
            # Uncomment the following line to enable a transport named "amqp"
            amqp:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: messages
                        type: direct
                    queue:
                        name: queue_messages

        routing:
            # Route your messages to the transports
             'App\Message\Transaction': amqp

我想补充的是:

        routing:
            # Route your messages to the transports
             amqp: 'App\Message\Transaction'
4

1 回答 1

3

Ryan Weaver 在 symfony 的 slack 上回答了一个类似的问题

如果消息不是来自 messenger,您将需要一个自定义的 messenger 序列化程序:)

1)您创建一个自定义序列化(从 Messenger 实现 SerializerInterface)并在 Messenger 配置下配置它

2) 不知何故,在那个序列化程序中,您将 JSON 转换为您在代码中拥有的一些“消息”对象。你如何做到这一点取决于你——你需要能够以某种方式查看你的 JSON 并找出它应该映射到哪个消息类。然后您可以手动创建该对象并填充数据,或者使用 Symfony 的序列化程序。在退回之前将其包裹在信封中

3) 因为您的序列化程序现在正在返回一个“消息”对象(如果有的话),Messenger 使用其正常逻辑来查找该消息的处理程序并执行它们


我为自己的需要做了一个快速的实现,由你来适应你的业务逻辑

1 - 创建一个Serializer实现SerializerInterface


   // I keeped the default serializer, and just override his decode method.

   /**
     * {@inheritdoc}
     */
    public function decode(array $encodedEnvelope): Envelope
    {
        if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
            throw new InvalidArgumentException('Encoded envelope should have at least a "body" and some "headers".');
        }

        if (empty($encodedEnvelope['headers']['action'])) {
            throw new InvalidArgumentException('Encoded envelope does not have an "action" header.');
        }

        // Call a factory to return the Message Class associate with the action
        if (!$messageClass = $this->messageFactory->getMessageClass($encodedEnvelope['headers']['action'])) {
            throw new InvalidArgumentException(sprintf('"%s" is not a valid action.', $encodedEnvelope['headers']['action']));
        }

        // ... keep the default Serializer logic

        return new Envelope($message, ...$stamps);
    }

Message2 -使用工厂检索权利:

class MessageFactory
{
    /**
     * @param string $action
     * @return string|null
     */
    public function getMessageClass(string $action)
    {
        switch($action){
            case ActionConstants::POST_MESSAGE :
                return PostMessage::class ;
            default:
                return null;
        }
    }
}

3) 为 messenger 配置新的自定义序列化程序:

framework:
  messenger:
    serializer: 'app.my_custom_serializer'

我会尝试更进一步,找到一种直接“连接”队列的方法,让您知道。

于 2019-04-09T19:12:38.303 回答