1

系统

我为(商业)客户端编写了一个基于 PHP 和 Apache 的简单消息分发系统。数据流的简化版本如下所示:通过 HTTP 将 JSON 消息发送到运行在 Apache 服务器上的 PHP 脚本,该脚本对其进行一些处理,然后再次使用 HTTP 将其分发给多个客户端,大约. 10-20。这些消息以不同的时间间隔出现,每秒最多 5 条消息,但有时每分钟只有一条。

使用的服务器是一台功能强大的 Ubuntu 机器,具有 8gB 内存和位于连接良好的数据仓库中的快速网卡。

由于客户端对响应时间有非常严格的要求(对于等待回复的消息,响应时间必须低于 200 毫秒),我实现了自己的 HttpRequest 类,以允许对各种超时和执行持续时间进行非常细粒度的控制。服务器使用此类将消息转发给其客户端。有趣的部分看起来像这样:

class HttpRequest {
    private $host;
    private $port;
    private $path;

    private $connection = null;
    private $headers = array();

    [...]

    const CONTENT_TYPE = 'application/json';
    const ENCODING = 'utf-8';
    const TIMEOUT = 0.2;

    const MAX_RESPONSE_SIZE = 0xFFFF; // 64 kB

    public function __construct($url, $callback = null) {
        if (empty($url)) {
            throw new HttpException("url cannot be empty");
        }

        $url_parts = parse_url($url);

        $this->host = $url_parts['host'];
        $this->port = (empty($url_parts['port']) ? 80 : $url_parts['port']);
        $this->path = $url_parts['path'];

        $this->headers['Host'] = $this->host;
        $this->headers['Connection'] = 'close';
        $this->headers['Cache-Control'] = 'no-cache';
    }

    public function __destruct() {
        try {
            $this->disconnect();
        } catch (Exception $e) {
        }
    }

    private function connect() {
        if ($this->connection != null) {
            // already connected, simply return
            return;
        }

        $errno = '';
        $errstr = '';
        $_timeout = self::TIMEOUT;

        $this->connection = @fsockopen($this->host, $this->port, $errno, $errstr, $_timeout);

        if ($this->connection === false) {
            throw new HttpException("error during connect: $errstr", ($errno == SOCKET_ETIMEDOUT));
        }

        stream_set_timeout($this->connection, (int)(floor($_timeout)), (int)(($_timeout - floor($_timeout)) * 1000000));

        [variable assignments]
    }

    private function disconnect() {
        if ($this->connection == null) {
            // already disconnected, simply return
            return;
        }

        @fclose($this->connection);
        $this->connection = null;
    }

    public function post($data, $fetch_response = true, $path = null) {
        if (empty($data)) {
            throw new HttpException("no data given", false);
        }

        $contenttype = 'application/x-www-form-urlencoded';

        if (is_string($data)) {
            $data = urlencode($data);
        } else if (is_array($data)) {
            $data = http_build_query($data);
        } else if ($data instanceof Message) {
            $data = json_encode($data);

            $contenttype = 'application/json';
        }

        if (!is_string($data)) {
            throw new HttpException("wrong datatype", false);
        }

        $encoding = mb_detect_encoding($data);

        if ($encoding != self::ENCODING) {
            $data = mb_convert_encoding($data, self::ENCODING, $encoding);
        }

        if (empty($path)) {
            $path = $this->path;
        }

        // set header values
        $this->headers['Content-Type'] = $contenttype . '; charset=' . self::ENCODING;
        $this->headers['Content-Length'] = mb_strlen($data);

        // build request
        $request = "POST $path HTTP/1.1\r\n";

        foreach ($this->headers as $header => $value) {
            $request .= "$header: $value\r\n";
        }

        $request .= "\r\n$data";

        // and send it
        $this->sendRequest($request, $fetch_response);

        if ($fetch_response) {
            // fetch and parse response
            $resp = $this->receiveResponse();

            return $resp;
        }
    }

    public function get($path = null) {
        [build and execute http query]
    }

    private function sendRequest($request, $keep_connected = true) {
        // connect the socket
        $this->connect();

        [timer1]

        // write out data
        $result = @fwrite($this->connection, $request);

        [timer2]

        if ($result === false || $result != mb_strlen($request)) {
            $this->disconnect();
            throw new HttpException("write to socket failed", false);
        }

        if (!$keep_connected) {
            $this->disconnect();
        }
    }

    private function receiveResponse() {
        [fetch response using stream_select() and fgets() while strictly observing the timeout]
    }

    private function parseLine($msg) {
        [process http response, used in receiveResponse()]
    }
}

这个类通常是这样使用的:

$request = new HttpRequest($url);
$request->post($data);

问题

每隔一段时间,一些消息就会过期并记录超过 5 秒的超时。仅从代码来看,这应该是不可能的,因为对 IO 相关函数的每次调用都应该在这段时间之前很久就超时。

分析语句(在代码中表示为 [timer1] 和 [timer2])显示对 HttpRequest->connect() 的调用是发生延迟的地方。我最好的猜测是 fsockopen() 出于某种原因忽略了交给它的超时。

有趣的是,每当超时超过给定限制时,它们通常只是超过 5 秒,这让我相信在较低层的网络代码中的某处存在 5 秒的延迟(例如,套接字资源耗尽)。可能可以排除与 DNS 相关的问题,因为当使用 IP 地址指定主机时也会发生此行为。

该问题通常发生在将消息分发给多个客户端时,这意味着将快速连续发送多个请求,但是我在仅向一个客户端发送单个消息时也注意到了它。当对 Apache 有多个同时请求时,它似乎通常发生,但并非必然发生。

有没有人遇到过类似的问题?互联网不是很详尽,也没有通过 PHP 源代码工作。有关如何解决此问题的任何指示?

4

0 回答 0