我正在编写一个 WebSocket 服务器。虽然握手成功并且服务器可以使用 RFC 标准发送编码数据,但 socket_select() 只能在连接新客户端时检测更改,而不是在客户端向服务器发送数据时检测到更改。怎么了?
class Server{
private $address;
private $port;
private $master;
private $sockets;
private $stream_sockets;
private $clients;
private $verbose_mode;
function __construct($address = '127.0.0.1', $port = 5001, $verbose_mode = true){
$this->address = $address;
$this->port = $port;
$this->verbose_mode = $verbose_mode;
$this->console("Socket server is starting...");
//socket creation
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if(!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1))
{
$this->console("Failed to set socket option: ".socket_strerror(socket_last_error()));
}
if(!socket_set_nonblock($socket))
{
$this->console("Failed to set socket nonblock: ".socket_strerror(socket_last_error()));
}
if(!is_resource($socket))
{
$this->console("Failed to create socket: ".socket_strerror(socket_last_error()));
}
if(!socket_bind($socket, $this->address, $this->port))
{
$this->console("Failed to bind socket: ".socket_strerror(socket_last_error()));
}
if(!socket_listen($socket, 20))
{
$this->console("Failed to listen: ".socket_strerror(socket_last_error()));
}
$this->master = $socket; //add current socket resource as master socket
$this->sockets = array($socket);//add current resource to all sockets
$this->console("Socket server started on {$this->address}:{$this->port}");
}
public function run(){
$this->console("Start running...");
$write = array();
$except = array();
while(isset($this->sockets[0])){
$changed_sockets = $this->sockets;
$result = socket_select($changed_sockets, $write, $except, 1 ,0);
//@stream_select($changed_sockets, $write = null, $except = null, 0, 5000);
if($result > 0)
{
$this->console("number of sockets: ".count($this->sockets));
$this->console("number of changed sockets: ".$result);
foreach($changed_sockets as $socket)
{
if($socket == $this->master)//self changed
{
if(($accepted_socket = socket_accept($this->master))!== false)//incoming connection
{
$this->connect($accepted_socket);//add as a client to the pool with info to be tracked without handshaking at first
}
else
{
$this->console("Socket error: ".socket_strerror(socket_last_error()));
}
}
else //must be others in sockets pool
{
$this->console("Finding socket associated with the client...");
$client = $this->get_client_by_socket($socket); //get client object to track them using socket that they are associated with
if($client){
$this->console("receiving data from the client.");
$bytes = @socket_recv($socket, $data, 2048, MSG_DONTWAIT);
$this->console("byte size received: $bytes");
if(!$client->get_handshake())//check if handshaking has done
{
$this->console("handshaking...");
$this->handshake($client, $data);//handshaking if it is not done previously
}
else if($bytes === 0)
{
$this->disconnect($client);
}
else
{
$this->console("incoming data from client {client->get_id()}");
$this->read($client, $data);//read from client if there are changes in sockets
}
}
}
}
}
}
}
private function slingshot($client, $read){
$send="00:00:00:00".",DM,SAY,0,".$read;
fwrite($client->get_stream_socket(), $send);//output to apollo
//workaround for apollo
if($client->get_initial())
{
$initial = 7;
$continue = 0;
}
else
{
$initial = 8;
$continue = 1;
}
while(TRUE)
{
//input from iris
$num = fgets($client->get_stream_socket(), $initial);//$number of words
if(ltrim($num) > 0)
{
$res = fgets($client->get_stream_socket(), ltrim($num)+1);
if($res!="")
{
fgets($fp,1);
$client->set_initial(false);
$res = $num.$res;
$res = substr($res,6+$continue);
//output to client
$message = rtrim($res);
send($client, $message);
break;
}
}
}
}
private function read($client, $received){
$read = $this->unmask($received);
$this->console("received from client: ".$read);
if($read == "##client exit##") {
$this->console("Killing a child process");
posix_kill($client->get_pid(), SIGTERM);
$this->console("Process {$client->get_pid()} is terminated.");
}
else
{
$this->console("start a child process");
$pid = pcntl_fork();
if($pid == -1)
{
die('could not fork.');
}
else if($pid)
{
$client->set_pid($pid);
}
else
{
//we are the child
$this->slingshot($client, $read);
}
}
}
private function disconnect($client){
$this->console("Disconnecting client #{$client->get_id()}");
$i = array_search($client, $this->clients);//search client in clients pool
$j = array_search($client->get_socket(), $this->sockets);//search client's socket in socket pool
if($j >= 0)
{
array_splice($this->sockets, $j, 1);
socket_close($client->get_socket());
$this->console("Socket closed.");
}
if($i >= 0)
{
array_splice($this->clients, $i, 1);
}
$this->console("Client #{$client->get_id()} disconnected.");
}
private function unmask($payload) {
$length = ord($payload[1]) & 127;
if($length == 126)
{
$masks = substr($payload, 4, 4);
$data = substr($payload, 8);
}
elseif($length == 127)
{
$masks = substr($payload, 10, 4);
$data = substr($payload, 14);
}
else
{
$masks = substr($payload, 2, 4);
$data = substr($payload, 6);
}
$text = '';
for ($i = 0; $i < strlen($data); ++$i){
$text .= $data[$i] ^ $masks[$i%4];
}
return $text;
}
private function encode($text){
// 0x1 text frame (FIN + opcode)
$b1 = 0x80 | (0x1 & 0x0f);
$length = strlen($text);
if($length <= 125)
{
$header = pack('CC', $b1, $length);
}
elseif($length > 125 && $length < 65536)
{
$header = pack('CCS', $b1, 126, $length);
}
elseif($length >= 65536)
{
$header = pack('CCN', $b1, 127, $length);
}
return $header.$text;
}
private function send($client, $text){
$this->console("Client {$client->get_id()}<<".$text);
$text = $this->encode($text);
if(socket_write($client->get_socket(), $text, strlen($text)) === false) {
$this->console("Unable to write to client #{$client->get_id()}'s socket");
$this->disconnect($client);
}
}
private function start_process(){
$this->console("start a child process");
$pid = pcntl_fork();
if($pid == -1)
{
die('could not fork.');
}
else if($pid)
{
$client->set_pid($pid);
}
else
{
//we are the child
$this->send($client, "something to be sent.");
}
}
private function handshake($client, $headers){//data as headers
$this->console("Getting client WebSocket version...");
if(preg_match("/Sec-WebSocket-Version: (.*)\r\n/", $headers, $match))
{
$version = $match[1];
}
else
{
$this->console("The client doesn't support WebSocket.");
}
$this->console("Client WebSocket version is {$version}, (required: 13)");
if($version == 13)
{
$this->console("Getting headers...");
if(preg_match("/GET (.*) HTTP/", $headers, $match))
{
$root = $match[1];
}
if(preg_match("/Host: (.*)\r\n/", $headers, $match))
{
$host = $match[1];
}
if(preg_match("/Origin: (.*)\r\n/", $headers, $match))
{
$origin = $match[1];
}
if(preg_match("/Sec-WebSocket-Key: (.*)\r\n/", $headers, $match))
{
$key = $match[1];
}
$this->console("client Headers are:");
$this->console("\t- Root: ".$root);
$this->console("\t- Host: ".$host);
$this->console("\t- Origin: ".$origin);
$this->console("\t- Sec-WebSocket-Key: ".$key);
$this->console("Generating Sec-WebSocket-Accept key...");
$acceptKey = $key.'258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
$acceptKey = base64_encode(sha1($acceptKey, true));
$upgrade = "HTTP/1.1 101 Switching Protocols\r\n".
"Upgrade: websocket\r\n".
"Connection: Upgrade\r\n".
"Sec-WebSocket-Accept: $acceptKey".
"\r\n\r\n";
$this->console("sending this response to the client #{$client->get_id()}:\r\n".$upgrade);
socket_write($client->get_socket(), $upgrade);
$client->set_handshake(true);
$this->console("Handshake is successfully done!");
return true;
}
else
{
$this->console("WebSocket version 13 is required (the client supports version {$version})");
return false;
}
}
private function get_client_by_socket($socket){
foreach($this->clients as $client)//get all client objects from the pool and check one by one
{
if($client->get_socket() == $socket)//if socket returned from the client matches with parameter
{
$this->console("client found");
return $client;
}
}
return false;//no such client
}
private function connect($socket){
$this->console("creating client...");
$client_id = uniqid();
while(true){
$stream_socket = @stream_socket_client("tcp://127.0.0.1:10000", $errno, $errstr);
if($stream_socket)
{
$this->console("Apollo client created for client #$client_id.");
break;
}
else
{
$this->console("creation failed. Attempting to recreate Apollo client.");
}
}
$client = new Client($client_id, $socket, $stream_socket);
$this->clients[] = $client; //add the socket as client to be tracked
$this->sockets[] = $socket;//add socket as resource to sockets pool
$this->stream_sockets[] = $stream_socket;//add socket as resource to stream sockets pool
$this->console("Client #{$client->get_id()} is successfully created!");
}
private function console($text, $continue = true){
if(!$continue)
{
die($text);
}
if($this->verbose_mode)
{
echo date('[Y-m-d H:i:s] ').$text."\r\n";
}
}
}