0

我目前有一个 swoole websocket 服务器,我通过前端进行通信。所以消息是由前端发送的,在 swoole websocket 服务器中的 onMessage() 方法中受到威胁,服务器向前端发送响应。因为我想做一个 swoole websocket 服务器集群,所以我改变了它的工作方式,现在消息由前面发送,进入 onMessage() 方法,在 rabbitMQ 通道中发送消息,我使用命令(Symfony)启动rabbitMQ 频道上的监听器。这就是我卡住的地方,我试图创建一个 swoole 客户端以将 rabbitMQ 中的消息发送到 swoole 服务器,但它没有进入 onMessage() 方法。

所以问题是:这是一个好方法吗?为什么消息没有进入 onMessage() ?如何识别消息是来自前端还是来自 swoole 客户端?

这是我所做的命令的代码:

class RabbitmqStartCommand extends Command
{
    /**
     * @var string
     */
    protected static $defaultName = 'app:rabbitmq:start';

    /**
     * @var MessageService
     */
    private $messageService;

    public function __construct(MessageService $messageService, string $name = null)
    {
        $this->messageService   = $messageService;
        parent::__construct($name);
    }

    protected function configure()
    {
        $this
            ->setDescription('This command will launch the rabbitMQ listener')
        ;
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $io   = new SymfonyStyle($input, $output);

        try {
            $this->startRabbitMqListener();
        } catch (\Exception $e) {
            $io->error('Something went wrong when trying to launch rabbitMQ listener. Message: '.$e->getMessage());
        }

        $io->success('Goodbye');

        return 0;
    }

    private function startRabbitMqListener()
    {
        $connection = new AMQPStreamConnection('rabbitmq', 5672, 'guest', 'guest');
        $channel    = $connection->channel();

        $channel->queue_declare('swoole_message', false, false, false, false);

        echo "[*] Waiting for messages. To exit press CTRL+C\n";

        $callback = function ($msg) {
            $client  = new Client(SWOOLE_SOCK_TCP);
            $client->connect('swoole', 9503);
            $request = $client->send($msg->body);
            $client->close();
        };

        $channel->basic_consume('swoole_message', '', false, true, false, false, $callback);

        while ($channel->is_open()) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }
4

0 回答 0