1

几个月前,我需要创建一个实时表格来显示网站中访问者的行为。所以我开始通过谷歌搜索并使用其他人编写的一些代码来学习 php 中的套接字编程。结果是创建了这两个文件,帮助我实现了我所需要的。

但是由于某些未知原因,有时套接字服务器会停止接受新连接,尽管我可以看到它仍在我的服务器进程中运行。这种情况每月只发生一次或两次,我不得不终止该过程并重新启动它。该网站每天有大约 1500 名访问者,每个人在网上停留大约 2-10 分钟并填写一些表格,我们将在实时表格上看到他们的行为。

我有两组不同的客户端连接到该服务器。第一个是可以看到实时表格的版主,第二个是我的 Web 应用程序发送数据包以更新表格中的信息,每当访问者进行一些更改或做一些事情时。

我在 ssh 中使用screen命令来运行该进程。

所以我需要的是找到一种方法来调试它并找到问题。或者也许有人可以通过阅读我的代码在这里找到问题。

套接字.php:

<?php
class socket {
    function send($socket, $message) {
        $messageLength = strlen($message);
        return @socket_write($socket,$message,$messageLength);
    }

    function broadcast($clients, $message) {
        $messageLength = strlen($message);
        foreach($clients as $socket) {
            @socket_write($socket, $message, $messageLength);
        }
        return true;
    }

    function unseal($socketData) {
        $length = @ord($socketData[1]) & 127;
        if($length == 126) {
            $masks = substr($socketData, 4, 4);
            $data = substr($socketData, 8);
        }
        elseif($length == 127) {
            $masks = substr($socketData, 10, 4);
            $data = substr($socketData, 14);
        }
        else {
            $masks = substr($socketData, 2, 4);
            $data = substr($socketData, 6);
        }
        $socketData = "";
        for ($i = 0; $i < strlen($data); ++$i) {
            $socketData .= $data[$i] ^ $masks[$i%4];
        }
        return $socketData;
    }

    function seal($socketData) {
        if(is_array($socketData)) {
            $socketData = json_encode($socketData);
        }
        $b1 = 0x80 | (0x1 & 0x0f);
        $length = strlen($socketData);

        if($length <= 125)
            $header = pack('CC', $b1, $length);
        elseif($length > 125 && $length < 65536)
            $header = pack('CCn', $b1, 126, $length);
        elseif($length >= 65536)
            $header = pack('CCNN', $b1, 127, $length);
        return $header.$socketData;
    }

    function doHandshake($received_header,$client_socket_resource, $host_name, $port) {
        $headers = $this->headersToArray($received_header);

        if(isset($headers['Sec-WebSocket-Key'])) {
            $secKey = $headers['Sec-WebSocket-Key'];
            $secAccept = base64_encode(pack('H*', sha1($secKey . 'XXXX-XXXX')));
            $buffer = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
                "Upgrade: websocket\r\n" .
                "Connection: Upgrade\r\n" .
                "WebSocket-Origin: $host_name\r\n" .
                "WebSocket-Location: ws://$host_name:$port/\r\n" .
                "Sec-WebSocket-Accept:$secAccept\r\n\r\n";
            socket_write($client_socket_resource, $buffer, strlen($buffer));
            return $headers;
        } else {
            return false;
        }
    }

    function headersToArray($received_header) {
        $headers = array();
        $lines = preg_split("/\r\n/", $received_header);
        foreach($lines as $line)
        {
            $line = chop($line);
            if(preg_match('/\A(\S+): (.*)\z/', $line, $matches))
            {
                $headers[$matches[1]] = $matches[2];
            }
        }
        return $headers;
    }

    function getCookiesFromHeader($headersArray)
    {
        $c = @$headersArray['Cookie'];
        $c = str_replace(' ','',$c);
        $cookies = explode(';',$c);
        $res = [];
        foreach($cookies as $cookie) {
            $temp = explode('=',$cookie);
            if(isset($temp[0],$temp[1])) {
                $res[$temp[0]] = $temp[1];
            }
        }
        return $res;
    }
}

和 socket-server.php :

<?php
// don't timeout!
set_time_limit(0);

require_once('include/config.php');
require_once('core/socket.php');
require_once('core/db.php');

error_reporting(E_ALL);
ini_set('display_errors',1);

// create socket
$socket = socket_create(AF_INET, SOCK_STREAM, 0) or die("Could not create socket\n");
if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
    echo socket_strerror(socket_last_error($socket));
    exit;
}
// bind socket to port
$result = socket_bind($socket, SOCKET_HOST, SOCKET_PORT) or die("Could not bind to socket\n");
// start listening for connections
$result = socket_listen($socket, 3) or die("Could not set up socket listener\n");


$clients = array($socket);
$clientsToWrite = array();
$wsh = new socket();
$i = 0;
while (true) {
    $read = $clients;
    $msgToAll = '';
    // get a list of all the clients that have data to be read from
    // if there are no clients with data, go to next iteration
    $write = $except = NULL;
    if (socket_select($read, $write, $except, NULL) < 1)
        continue;

    if (in_array($socket, $read)) {
        //echo "socket is in array\n";
        $newSocket = socket_accept($socket);

        $header = @socket_read($newSocket, 1024);
        if(substr($header,0,6) == 'update') { // connection from web application
            $header = str_replace('update','',$header);
            $update = json_decode($header,true);
            $msgToAll = $update;
            //echo "New update received\n";
            socket_write($newSocket,'OK');
            socket_close($newSocket);
        } elseif($header == '3X!T') { // exit command
            socket_write($newSocket,'OK');
            socket_close($newSocket);
            socket_close($socket);
            die('');
        } else {
            // New connection from moderator section
            // Lets do handshake";
            if($headers_arr = $wsh->doHandShake($header, $newSocket, SOCKET_HOST, SOCKET_PORT)) {
                if ($data = $wsh->unseal(@socket_read($newSocket, 1024))) {
                    if ($session = checkSessData($data)) {
                        $clientsToWrite[] = [
                            'session' => $session,
                            'socket' => $newSocket,
                        ];
                        $clients[] = $newSocket;
                        // send the client a welcome message
                        $msg = ['type' => 'system', 'do' => 'connected'];
                        $msg = $wsh->seal($msg);
                        if ($bytes = socket_write($newSocket, $msg)) {
                            //echo "sent {$bytes} bytes to client\n";
                        } else {
                            $err = socket_strerror(socket_last_error());
                            echo "err {$err}\n";
                        }
                        socket_getpeername($newSocket, $ip);
                        //echo "New client connected: {$ip}\n";
                    } else {
                        $msg = ['type' => 'system', 'do' => 'unauthorized access'];
                        $msg = $wsh->seal($msg);
                        socket_write($newSocket, $msg);
                        socket_close($newSocket);
                    }
                } else {
                    socket_close($newSocket);
                }
            } else {
                socket_close($newSocket);
            }
        }

        // remove the listening socket from the clients-with-data array
        $key = array_search($socket, $read);
        unset($read[$key]);

    }

    if(!empty($msgToAll)) {
        $method = $msgToAll['method'];
        $user_id = $msgToAll['user_id'];
        unset($msgToAll['method'],$msgToAll['user_id']);
        $data = $wsh->seal($msgToAll);
        foreach ($clientsToWrite as $cid => $ctw) {
            if($ctw['session']['method'] == $method) {
                if ($ctw['session']['section'] == 'admin' || $ctw['session']['user_id'] == $user_id) {
                    // write the message to the client -- add a newline character to the end of the message
                    if(@socket_write($ctw['socket'], $data) === false) {
                        @socket_close($ctw['socket']);
                        unset($clientsToWrite[$cid]);
                        $key = array_search($ctw['socket'],$clients);
                        unset($clients[$key]);
                    }
                }
            }

        }
    }

    // loop through all the clients that have data to read from
    foreach ($read as $k => $read_sock) {
        //echo "loop through read {$k}\n";
        // read until newline or 1024 bytes
        // socket_read while show errors when the client is disconnected, so silence the error messages
        $data = socket_read($read_sock, 1024);
        $res = searchClients($read_sock, $clientsToWrite);

        // check if the client is disconnected
        if ($data === false) {
            // remove client for $clients array
            if($res) {
                unset($clientsToWrite[$res]);
            }
            //echo "client disconnected.\n";
            // continue to the next client to read from, if any
            continue;
        }

        if(empty($data))
            continue;
        $data = $wsh->unseal($data);
        //echo 'new data received : '.$data."\n";
        $data = json_decode($data,true);
        $message = '';
        DB::connect();
        switch($data['action']) {
            case 'addToSP' :
                // do some stuff in db
                $message = 'stuff done successfully!';
                break;
            case 'deleteFromSP' :
                // do some stuff in db
                $message = 'stuff done successfully!';
                break;
        }
        DB::disconnect();
        $message = [
            'type' => 'alert',
            'message' => $message,
        ];
        @socket_write($read_sock,$wsh->seal($message));

    } // end of reading foreach
}

// close the listening socket
socket_close($socket);

function checkSessData($data) {
    $data = json_decode($data,true);
    if(isset($data['method'],$data['user_id'],$data['hash'])) {
        $file_path = 'pathToLiveSessionsCache/'.$data['user_id'].'-'.$data['hash'].'.txt';
        if(file_exists($file_path)) {
            $content = file_get_contents($file_path);
            $content = explode(',',$content);
            if($content[0] == $data['section'].'_'.$data['method']) {
                if($content[1]+300 >= time()) { // max 5 min after starting page
                    unlink($file_path);
                    return $data;
                }
            }
            unlink($file_path);
        }
    }
    return false;
}

function searchClients($socket, $list)
{
    foreach($list as $k => $item) {
        if($item['socket'] == $socket) {
            return $k;
        }
    }
    return false;
}

我的 Web 应用程序将使用此函数将数据包发送到套接字服务器:

function sendDataToSocketServer($data,$type='update')
{
    $message = $type;
    if(!empty($data)) {
        $message .= json_encode($data);
    }
    $socket = socket_create(AF_INET, SOCK_STREAM, 0);
    socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, array('sec' => 1, 'usec' => 0));
    socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, array('sec' => 1, 'usec' => 0));
    $result = socket_connect($socket, SOCKET_HOST, SOCKET_PORT);
    socket_write($socket, $message, strlen($message));
    $result = socket_read($socket, 1024);
    socket_close($socket);
}

这也是我用于版主连接到套接字服务器的 js 代码:

var ws;
    function connect() {
        ws = new WebSocket('ws://myDomain.com:12345');
        ws.onopen = function () {
            ws.send('<?=$liveSessData;?>');
        };

        ws.onmessage = function (evt) {
            var msg = JSON.parse(evt.data);
            switch(msg.type) {
                case 'system':
                    systemNotif(msg);
                    break;
                case 'new':
                    newData(msg);
                    break;
                case 'change':
                    updateData(msg);
                    break;
            }
        };

        ws.onclose = function () {
            disconnected();
        };
    }
4

0 回答 0