2

是否可以在 Symfony Messenger 组件中定义交换之间的绑定?(此处为 4.4 版本)。

我知道有可能像这里一样绑定到队列交换:

transports:
            incoming:
                dsn: "%env(RABBITMQ_SHARED_URL)%"
                options:
                    queues:
                        app.pl_incoming_events:
                            binding_keys:
                                - pl.app.#
                    exchange:
                        name: my_app.incoming
                        type: topic

然后应用程序设置它们之间的交换、队列和绑定。我希望基于路由键将交换绑定到另一个交换具有相同的效果。

我知道我可以使用rabbitmq-bundle,但IMO 是多余的——我想保留一个组件来管理rabbitMQ。

例如,我想根据某些路由键将交换绑定other_app到交换。my_app.incoming

4

1 回答 1

3

Messenger 不是 RabbitMQ 管理器,您甚至不能在开箱即用的同一传输中声明多个交换。但是由于它具有所有必需的组件,并且在这种情况下 symfony 对配置有点宽容,您可以滥用系统并自己构建它。

由于我不知道您的要求,我会保持基本,希望它能让您入门。

从创建开始AmqpTransportFactory

// src/Amqp/AmqpTransportFactory.php
namespace App\Amqp;

use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class AmqpTransportFactory implements TransportFactoryInterface
{
    public function createTransport(
        string $dsn, array $options, SerializerInterface $serializer
    ): TransportInterface
    {
        unset($options['transport_name']);
        $exchanges['name'] = $options['exchange']['name'];
        $exchanges['bindings'] = $options['exchange']['bindings'] ?? [];

        // Passing unknown options is deprecated in 5.1
        unset($options['exchange']['bindings']);

        $connection = Connection::fromDsn($dsn, $options);

        // Ensure our exchange is created first
        $connection->exchange()->declareExchange();
        $channel = $connection->channel();
    
        // This is normally done in the Connection, but is harder to override
        $this->createExchanges($channel, $exchanges);

        return new AmqpTransport($connection, $serializer);
    }

    public function supports(string $dsn, array $options): bool
    {
        return 0 === strpos($dsn, 'amqp://');
    }

    private function createExchanges(\AMQPChannel $channel, array $configuration): void
    {
        $factory = new AmqpFactory();

        foreach ($configuration['bindings'] as $exchange_name => $arguments) {
            $exchange = $factory->createExchange($channel);
            $exchange->setName($exchange_name);
            $exchange->setType($arguments['type'] ?? \AMQP_EX_TYPE_FANOUT);
            $exchange->setFlags($arguments['flags'] ?? \AMQP_DURABLE);
            $exchange->declareExchange();

            if (!is_array($arguments['binding_keys'])) {
                $arguments['binding_keys'] = [$arguments['binding_keys']];
            }

            foreach ($arguments['binding_keys'] as $key) {
                $exchange->bind($configuration['name'], $key);
            }
        }
    }
}

注册服务:

# config/services.yaml
services:
  messenger.transport.amqp.factory:
    class: App\Amqp\AmqpTransportFactory

将新配置添加到交换:

# config/packages/messenger.yaml
exchange:
  name: my_app.incoming
  type: topic
  bindings:
    other_app:
      type: direct
      binding_keys: ['route']

它将导致以下绑定:

+-----------------+------------------------+------------------------+
|     source      |      destination       |      routing_key       |
+-----------------+------------------------+------------------------+
| my_app.incoming | app.pl_incoming_events | pl.app.#               |
| my_app.incoming | other_app              | route                  |
+-----------------+------------------------+------------------------+
于 2020-10-22T23:14:36.237 回答