1

我尝试通过 Thruway 设置一个 websocket 服务器,它可以管理多个组。类似于聊天应用程序,每个客户端可以同时订阅一个或多个,并将消息广播到整个聊天室。我设法用一个古老版本的 Ratchet 做到了这一点,但由于它运行不太顺畅,我想切换到 Thruway。可悲的是,我找不到任何可以管理群组的东西。到目前为止,我作为 websocket-manager 拥有以下内容,并且客户端正在使用当前版本的 Autobahn|js (18.x)。

如果可以通过以下方式管理订阅组,是否有人知道?

<?php

require_once __DIR__.'/../vendor/autoload.php';

use Thruway\Peer\Router;
use Thruway\Transport\RatchetTransportProvider;

$router = new Router();
$router->addTransportProvider(new RatchetTransportProvider("0.0.0.0", 9090));

$router->start();
4

1 回答 1

5

使用 ThruWay,情况与旧 Ratchet 略有不同。首先,Thruway 不是 WAMP 服务器。它只是一个路由器。所以它没有像旧的 Rathcet 那样的服务器实例,它可以让你包装所有的服务器端功能。但它只会获取消息包,并将route它们发送到同一领域中的其他会话,具体取决于它们的订阅。如果您曾经使用过 socket.io,领域的想法类似于不同的连接,因此您可以将会话或连接限制到单个命名空间或拆分不同套接字实例(如管理、访问者等)的功能。

在高速公路(最新版本)的客户端上,一旦您订阅了一个主题,然后在该主题中发布,thruway 将自动检测主题订阅者并在同一领域向他们发送消息。但是在旧的棘轮中,您需要通过保留一系列可用频道来手动处理此问题,并在用户订阅时将用户添加到每个频道,并通过迭代这些用户在主题中向这些用户广播消息。这真的很痛苦。

如果你想在服务器端使用 RPC 调用并且不想在客户端包含一些你的东西,你仍然可以在服务器端使用一个名为 internalClient 的类。从概念上讲,内部客户端是另一个连接到您的高速公路客户端的会话,并在内部处理一些功能而不暴露其他客户端。它接收消息包并在其中执行操作,然后将结果返回给请求的客户端连接。我花了一段时间才理解它是如何工作的,但一旦我弄清楚背后的想法就更有​​意义了。

这么少的代码可以更好地解释,

在您的路由器实例中,您将需要添加一个模块,(请注意,在 voxys/thruway 包中的示例对内部客户端几乎没有混淆)

服务器.php

require __DIR__ . "/../bootstrap.php";
require __DIR__ . '/InternalClient.php';

$port = 8080;
$output->writeln([
    sprintf('Starting Sockets Service on Port [%s]', $port),
]);
$router = new Router();

$router->registerModule(new RatchetTransportProvider("127.0.0.1", $port));   // use 0.0.0.0 if you want to expose outside world

// common realm ( realm1 )
$router->registerModule(
    new InternalClient()    // instantiate the Socket class now
);

// administration realm (administration)
// $router->registerModule(new \AdminClient());

$router->start();

这将初始化 Thruway 路由器并将 internalclient 实例附加到它。现在在 InternalClient.php 文件中,您将能够访问实际路由以及当前连接的客户端。在他们提供的示例中,路由器不是实例的一部分,因此您只能使用新连接的会话 id 属性。

内部客户端.php

<?php

use Thruway\Module\RouterModuleInterface;
use Thruway\Peer\Client;
use Thruway\Peer\Router;
use Thruway\Peer\RouterInterface;
use Thruway\Logging\Logger;
use React\EventLoop\LoopInterface;

class InternalClient extends Client implements RouterModuleInterface
{
    protected $_router;

    /**
     * Contructor
     */
    public function __construct()
    {
        parent::__construct("realm1");
    }

    /**
     * @param RouterInterface $router
     * @param LoopInterface $loop
     */
    public function initModule(RouterInterface $router, LoopInterface $loop)
    {
        $this->_router = $router;

        $this->setLoop($loop);

        $this->_router->addInternalClient($this);
    }

    /**
     * @param \Thruway\ClientSession $session
     * @param \Thruway\Transport\TransportInterface $transport
     */
    public function onSessionStart($session, $transport)
    {
        // TODO: now that the session has started, setup the stuff

        echo "--------------- Hello from InternalClient ------------\n";
        $session->register('com.example.getphpversion', [$this, 'getPhpVersion']);

        $session->subscribe('wamp.metaevent.session.on_join',  [$this, 'onSessionJoin']);
        $session->subscribe('wamp.metaevent.session.on_leave', [$this, 'onSessionLeave']);
    }

    /**
     * Handle on new session joined.
     * This is where session is initially created and client is connected to socket server
     *
     * @param array $args
     * @param array $kwArgs
     * @param array $options
     * @return void
     */
    public function onSessionJoin($args, $kwArgs, $options) {
        $sessionId = $args && $args[0];
        $connectedClientSession = $this->_router->getSessionBySessionId($sessionId);
        Logger::debug($this, 'Client '. $sessionId. ' connected');
    }

    /**
     * Handle on session left.
     *
     * @param array $args
     * @param array $kwArgs
     * @param array $options
     * @return void
     */
    public function onSessionLeave($args, $kwArgs, $options) {

        $sessionId = $args && $args[0];

        Logger::debug($this, 'Client '. $sessionId. ' left');

        // Below won't work because once this event is triggered, client session is already ended
        // and cleared from router. If you need to access closed session, you may need to implement
        // a cache service such as Redis to access data manually.
        //$connectedClientSession = $this->_router->getSessionBySessionId($sessionId); 
    }

    /**
     * RPC Call messages
     * These methods will run internally when it is called from another client. 
     */
    private function getPhpVersion() {

        // You can emit or broadcast another message in this case
        $this->emitMessage('com.example.commonTopic', 'phpVersion', array('msg'=> phpVersion()));

        $this->broadcastMessage('com.example.anotherTopic', 'phpVersionRequested', array('msg'=> phpVersion()));

        // and return result of your rpc call back to requester
        return [phpversion()];
    }

    /**
     * @return Router
     */
    public function getRouter()
    {
        return $this->_router;
    }


    /**
     * @param $topic
     * @param $eventName
     * @param $msg
     * @param null $exclude
     */
    protected function broadcastMessage($topic, $eventName, $msg)
    {
        $this->emitMessage($topic, $eventName, $msg, false);
    }

    /**
     * @param $topic
     * @param $eventName
     * @param $msg
     * @param null $exclude
     */
    protected function emitMessage($topic, $eventName, $msg, $exclude = true)
    {
        $this->session->publish($topic, array($eventName), array('data' => $msg), array('exclude_me' => $exclude));
    }

}

在上面的示例代码中需要注意的几件事, - 为了接收主题中的消息,在客户端,您需要订阅该主题。- 内部客户端可以发布/发出/广播任何主题,无需在同一领域进行任何订阅。- 广播/发射功能不是原始高速公路的一部分,我想出一些东西来使我的出版物更容易一些。emit 将向所有订阅主题的人发送消息包,发送者除外。另一方面,广播不会排除发件人。

我希望这些信息对理解这个概念有所帮助。

于 2018-05-07T17:44:19.390 回答