2

我正在编写一个 WebSocket 服务器。虽然握手成功并且服务器可以使用 RFC 标准发送编码数据,但 socket_select() 只能在连接新客户端时检测更改,而不是在客户端向服务器发送数据时检测到更改。怎么了?

class Server{
private $address;
private $port;
private $master;
private $sockets;
private $stream_sockets;
private $clients;
private $verbose_mode;


    function __construct($address = '127.0.0.1', $port = 5001, $verbose_mode = true){
        $this->address = $address;
        $this->port = $port;
        $this->verbose_mode = $verbose_mode;
        $this->console("Socket server is starting...");

        //socket creation
        $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
        if(!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1))
        {
            $this->console("Failed to set socket option: ".socket_strerror(socket_last_error()));
        }

        if(!socket_set_nonblock($socket))
        {
            $this->console("Failed to set socket nonblock: ".socket_strerror(socket_last_error()));
        }

        if(!is_resource($socket))
        {
            $this->console("Failed to create socket: ".socket_strerror(socket_last_error()));
        }
        if(!socket_bind($socket, $this->address, $this->port))
        {
            $this->console("Failed to bind socket: ".socket_strerror(socket_last_error()));
        }
        if(!socket_listen($socket, 20))
        {
            $this->console("Failed to listen: ".socket_strerror(socket_last_error()));
        }
        $this->master = $socket; //add current socket resource as master socket
        $this->sockets = array($socket);//add current resource to all sockets
        $this->console("Socket server started on {$this->address}:{$this->port}");
    }

    public function run(){
        $this->console("Start running...");
        $write = array();
        $except = array();
        while(isset($this->sockets[0])){
            $changed_sockets = $this->sockets;
            $result = socket_select($changed_sockets, $write, $except, 1 ,0);
            //@stream_select($changed_sockets, $write = null, $except = null, 0, 5000);
            if($result > 0)
            {
                $this->console("number of sockets: ".count($this->sockets));
                $this->console("number of changed sockets: ".$result);
                foreach($changed_sockets as $socket)
                {
                    if($socket == $this->master)//self changed
                    {
                        if(($accepted_socket = socket_accept($this->master))!== false)//incoming connection
                        {
                            $this->connect($accepted_socket);//add as a client to the pool with info to be tracked without handshaking at first
                        }
                        else
                        {
                            $this->console("Socket error: ".socket_strerror(socket_last_error()));
                        }
                    }
                    else //must be others in sockets pool
                    {   
                        $this->console("Finding socket associated with the client...");
                        $client = $this->get_client_by_socket($socket); //get client object to track them using socket that they are associated with
                        if($client){
                            $this->console("receiving data from the client.");
                            $bytes = @socket_recv($socket, $data, 2048, MSG_DONTWAIT);
                            $this->console("byte size received: $bytes");
                            if(!$client->get_handshake())//check if handshaking has done
                            {
                                $this->console("handshaking...");
                                $this->handshake($client, $data);//handshaking if it is not done previously
                            }
                            else if($bytes === 0)
                            {
                                $this->disconnect($client);
                            }
                            else
                            {
                                $this->console("incoming data from client {client->get_id()}");
                                $this->read($client, $data);//read from client if there are changes in sockets
                            }
                        }
                    }
                }
            }
        }
    }

    private function slingshot($client, $read){
        $send="00:00:00:00".",DM,SAY,0,".$read;
        fwrite($client->get_stream_socket(), $send);//output to apollo
        //workaround for apollo
        if($client->get_initial())
        {
            $initial = 7;
            $continue = 0;
        }
        else
        {
            $initial = 8;
            $continue = 1;
        }
        while(TRUE)
        {   
            //input from iris
            $num = fgets($client->get_stream_socket(), $initial);//$number of words
            if(ltrim($num) > 0)
            {
                $res = fgets($client->get_stream_socket(), ltrim($num)+1);
                if($res!="")
                {
                    fgets($fp,1);
                    $client->set_initial(false);
                    $res = $num.$res;
                    $res = substr($res,6+$continue);
                    //output to client
                    $message = rtrim($res);
                    send($client, $message);
                    break;
                }
            }
        }
    }

    private function read($client, $received){
        $read = $this->unmask($received);
        $this->console("received from client: ".$read);
        if($read == "##client exit##") {
            $this->console("Killing a child process");
            posix_kill($client->get_pid(), SIGTERM);
            $this->console("Process {$client->get_pid()} is terminated.");
        }
        else
        {
            $this->console("start a child process");
            $pid = pcntl_fork();
            if($pid == -1)
            {
                die('could not fork.');
            }
            else if($pid)
            {
                $client->set_pid($pid);
            }
            else
            {
                //we are the child
                $this->slingshot($client, $read);
            }
        }
    }

    private function disconnect($client){
        $this->console("Disconnecting client #{$client->get_id()}");
        $i = array_search($client, $this->clients);//search client in clients pool
        $j = array_search($client->get_socket(), $this->sockets);//search client's socket in socket pool
        if($j >= 0)
        {
            array_splice($this->sockets, $j, 1);
            socket_close($client->get_socket());
            $this->console("Socket closed.");
        }
        if($i >= 0)
        {
            array_splice($this->clients, $i, 1);
        }
        $this->console("Client #{$client->get_id()} disconnected.");
    }

    private function unmask($payload) {
        $length = ord($payload[1]) & 127;
        if($length == 126)
        {
            $masks = substr($payload, 4, 4);
            $data = substr($payload, 8);
        }
        elseif($length == 127)
        {
            $masks = substr($payload, 10, 4);
            $data = substr($payload, 14);
        }
        else
        {
            $masks = substr($payload, 2, 4);
            $data = substr($payload, 6);
        }

        $text = '';
        for ($i = 0; $i < strlen($data); ++$i){
            $text .= $data[$i] ^ $masks[$i%4];
        }

        return $text;
    }

    private function encode($text){
        // 0x1 text frame (FIN + opcode)
        $b1 = 0x80 | (0x1 & 0x0f);
        $length = strlen($text);

        if($length <= 125)
        {
            $header = pack('CC', $b1, $length);
        }
        elseif($length > 125 && $length < 65536)
        {
            $header = pack('CCS', $b1, 126, $length);
        }
        elseif($length >= 65536)
        {
            $header = pack('CCN', $b1, 127, $length);
        }
        return $header.$text;
    }

    private function send($client, $text){
        $this->console("Client {$client->get_id()}<<".$text);
        $text = $this->encode($text);
        if(socket_write($client->get_socket(), $text, strlen($text)) === false) {
            $this->console("Unable to write to client #{$client->get_id()}'s socket");
            $this->disconnect($client);
        }
    }

    private function start_process(){
        $this->console("start a child process");
        $pid = pcntl_fork();
        if($pid == -1)
        {
            die('could not fork.');
        }
        else if($pid)
        {
            $client->set_pid($pid);
        }
        else
        {
            //we are the child
            $this->send($client, "something to be sent.");
        }
    }

    private function handshake($client, $headers){//data as headers
        $this->console("Getting client WebSocket version...");
        if(preg_match("/Sec-WebSocket-Version: (.*)\r\n/", $headers, $match))
        {
            $version = $match[1];
        }
        else
        {
            $this->console("The client doesn't support WebSocket.");
        }
        $this->console("Client WebSocket version is {$version}, (required: 13)");
        if($version == 13)
        {
            $this->console("Getting headers...");
            if(preg_match("/GET (.*) HTTP/", $headers, $match))
            {
                $root = $match[1];
            }
            if(preg_match("/Host: (.*)\r\n/", $headers, $match))
            {
                $host = $match[1];
            }
            if(preg_match("/Origin: (.*)\r\n/", $headers, $match))
            {
                $origin = $match[1];
            }
            if(preg_match("/Sec-WebSocket-Key: (.*)\r\n/", $headers, $match))
            {
                $key = $match[1];
            }

            $this->console("client Headers are:");
            $this->console("\t- Root: ".$root);
            $this->console("\t- Host: ".$host);
            $this->console("\t- Origin: ".$origin);
            $this->console("\t- Sec-WebSocket-Key: ".$key);

            $this->console("Generating Sec-WebSocket-Accept key...");
            $acceptKey = $key.'258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
            $acceptKey = base64_encode(sha1($acceptKey, true));

            $upgrade = "HTTP/1.1 101 Switching Protocols\r\n".
                       "Upgrade: websocket\r\n".
                       "Connection: Upgrade\r\n".
                       "Sec-WebSocket-Accept: $acceptKey".
                       "\r\n\r\n";

            $this->console("sending this response to the client #{$client->get_id()}:\r\n".$upgrade);
            socket_write($client->get_socket(), $upgrade);
            $client->set_handshake(true);
            $this->console("Handshake is successfully done!");
            return true;
        }
        else
        {
            $this->console("WebSocket version 13 is required (the client supports version {$version})");
            return false;
        }
    }

    private function get_client_by_socket($socket){
        foreach($this->clients as $client)//get all client objects from the pool and check one by one
        {
            if($client->get_socket() == $socket)//if socket returned from the client matches with parameter
            {
                $this->console("client found");
                return $client;
            }
        }
        return false;//no such client
    }

    private function connect($socket){
        $this->console("creating client...");
        $client_id = uniqid();
        while(true){
            $stream_socket = @stream_socket_client("tcp://127.0.0.1:10000", $errno, $errstr);
            if($stream_socket)
            {
                $this->console("Apollo client created for client #$client_id.");
                break;
            }
            else
            {
                $this->console("creation failed. Attempting to recreate Apollo client.");
            }
        }
        $client = new Client($client_id, $socket, $stream_socket);
        $this->clients[] = $client; //add the socket as client to be tracked
        $this->sockets[] = $socket;//add socket as resource to sockets pool
        $this->stream_sockets[] = $stream_socket;//add socket as resource to stream sockets pool
        $this->console("Client #{$client->get_id()} is successfully created!");
    }

    private function console($text, $continue = true){
        if(!$continue)
        {
            die($text);
        }
        if($this->verbose_mode)
        {
            echo date('[Y-m-d H:i:s] ').$text."\r\n";
        }
    }

}
4

2 回答 2

0

我发现要使套接字选择能够检测来自客户端的数据,套接字服务器必须在握手后立即向客户端发送一条虚拟消息。

于 2012-10-02T03:40:59.913 回答
0

那是因为您必须使用 接受新的传入连接socket_accept(),并传入您使用的套接字socket_listen()

生成的套接字可用于按预期检查传入数据。

编辑

您似乎忘记$write = $this->sockets;在调用socket_select().

于 2012-10-01T07:16:11.320 回答