1

我用 C 语言编写了一个在 Linux 上运行的单线程异步服务器:套接字是非阻塞的,至于轮询,我使用的是 epoll。基准测试表明服务器运行良好,并且根据 Valgrind 的说法,没有内存泄漏或其他问题。

唯一的问题是,当 write() 命令被中断时(因为客户端关闭了连接),服务器会遇到 EPIPE。我通过使用参数-b运行基准测试实用程序“围攻”来人为地中断。它连续执行许多请求,所有请求都完美运行。现在我按下 CTRL-C 并重新开始“围攻”。有时我很幸运,服务器无法发送完整的响应,因为客户端的 fd 无效。正如预期的那样,errno 设置为 EPIPE。我处理这种情况,在 fd 上执行 close() 然后释放与连接相关的内存。现在的问题是服务器阻塞并且不再正确回答。这是 strace 输出:

epoll_wait(4, {{EPOLLIN, {u32=0, u64=0}}}, 128, -1) = 1
accept(3, {sa_family=AF_INET, sin_port=htons(55328), sin_addr=inet_addr("127.0.0.1")}, [16]) = 5
fcntl64(5, F_GETFL)                     = 0x2 (flags O_RDWR)
fcntl64(5, F_SETFL, O_RDWR|O_NONBLOCK)  = 0
epoll_ctl(4, EPOLL_CTL_ADD, 5, {EPOLLIN|EPOLLERR|EPOLLHUP|EPOLLET, {u32=144039912, u64=144039912}}) = 0
epoll_wait(4, {{EPOLLIN, {u32=144039912, u64=144039912}}}, 128, -1) = 1
read(5, "GET /user/register HTTP/1.1\r\nHos"..., 4096) = 161
send(5, "HTTP/1.1 200 OK\r\nContent-Type: t"..., 106, MSG_NOSIGNAL) = 106 <<<<
send(5, "00001000\r\n", 10, MSG_NOSIGNAL) = -1 EPIPE (Broken pipe)        <<<< Why did the previous send() work?
close(5)                                = 0
epoll_wait(4, {{EPOLLIN, {u32=0, u64=0}}}, 128, -1) = 1
accept(3, {sa_family=AF_INET, sin_port=htons(55329), sin_addr=inet_addr("127.0.0.1")}, [16]) = 5
...

(如果您想知道,我从日志中删除了 printf())

如您所见,客户端建立了一个新连接,因此被接受。然后,它被添加到 EPOLL 队列中。epoll_wait() 表示客户端发送了数据(EPOLLIN)。解析请求并组成响应。发送标头可以正常工作,但是当涉及到正文时, write() 会导致 EPIPE。这不是“围攻”中的错误,因为它会阻止任何传入连接,无论来自哪个客户端。

#include "Connection.h"

static ExceptionManager *exc;

void Connection0(ExceptionManager *e) {
    exc = e;
}

void Connection_Init(Connection *this) {
    Socket_Init(&this->server);
    Socket_SetReusableFlag(&this->server);
    Socket_SetCloexecFlag(&this->server, true);
    Socket_SetBlockingFlag(&this->server, true);
    Socket_ListenTCP(&this->server, 8080, SOMAXCONN);

    // Add the server socket to epoll
    Poll_Init(&this->poll, SOMAXCONN, (void *) &Connection_OnEvent, this);
    Poll_AddEvent(&this->poll, NULL, this->server.fd, EPOLLIN | EPOLLET | EPOLLHUP | EPOLLERR);

    this->activeConn = 0;
}

void Connection_Destroy(Connection *this) {
    Poll_Destroy(&this->poll);
    Socket_Destroy(&this->server);
}

void Connection_Process(Connection *this) {
    Poll_Process(&this->poll, -1);
}

void Connection_AcceptClient(Connection *this) {
    Client *client;

    SocketConnection conn = Socket_Accept(&this->server);
    SocketConnection_SetBlockingFlag(&conn, true);

    client = New(Client);

    client->req = NULL;

    client->conn = New(SocketConnection);
    client->conn->remote = conn.remote;
    client->conn->fd = conn.fd;

    this->activeConn++;

    Poll_AddEvent(&this->poll, client, conn.fd, EPOLLIN | EPOLLET | EPOLLHUP | EPOLLERR);
}

void Connection_DestroyClient(Connection *this, Client *client) {
    // Poll_DeleteEvent(&this->poll, client->conn->fd);
    SocketConnection_Close(client->conn);

    if (client->req != NULL) {
        Request_Destroy(client->req);
        Memory_Free(client->req);
    }

    if (client->conn != NULL) {
        Memory_Free(client->conn);
    }

    Memory_Free(client);

    this->activeConn--;
}

void Connection_OnEvent(Connection *this, int events, Client *client) {
    /* error or connection hung up */
    if (client != NULL && (events & (EPOLLHUP | EPOLLERR))) {
        String_Print(String("EPOLLHUP | EPOLLERR received\n"));
        Connection_DestroyClient(this, client);
        return;
    }

    /* incoming connection */
    if (client == NULL && (events & EPOLLIN)) {
        if (this->activeConn > SOMAXCONN - 1) { /* TODO */
            String_Print(String("Too many connections...\n"));
            return;
        }

        Connection_AcceptClient(this);
        return;
    }

    /* receiving data from client */
    if (client != NULL && (events & EPOLLIN)) {
        if (client->req == NULL) {
            client->req = New(Request);
            Request_Init(client->req, client->conn);
        }

        bool keepOpen = false;

        try (exc) {
            keepOpen = Request_Parse(client->req);
        } catch(&SocketConnection_PipeException, e) {
            printf("Caught PipeException on fd=%d\n", client->conn->fd); fflush(stdout);
        } catch(&SocketConnection_ConnectionResetException, e) {
            printf("Caught ConnectionResetException on fd=%d\n", client->conn->fd); fflush(stdout);
        } finally {
            if (!keepOpen) {
                printf("Will close...\n"); fflush(stdout);
                Connection_DestroyClient(this, client);
            }
        } tryEnd;
    }
}
4

2 回答 2

6

用于设置tosigaction()的动作。然后你将只得到 -1 的返回码,设置为,没有信号。SIGPIPESIG_IGNerrnoEPIPE

在 Linux 上,另一种方法是send()MSG_NOSIGNAL标志一起使用,而不是write(). 这使您可以抑制该写入的信号,而不会影响任何其他信号。BSD 系统上的另一种选择是SO_NOSIGPIPE套接字选项,它禁止SIGPIPE对该套接字的所有写入。

在任何时候,内核 TCP 实现都可能从对等方接收 TCP RST。在该点之后对套接字的下一次写入将导致EPIPE错误,并且SIGPIPE如果它没有被抑制则信号。因此,即使有两次连续写入,第一次可能会成功,而下一次可能会因 EPIPE 和 SIGPIPE 而失败。

更新:虽然它不是发布的代码的一部分,但我在您的 strace 输出中看到您正在调用fcntl64(5, F_SETFL, O_RDONLY|O_NONBLOCK). 由于O_RDONLY为 0,因此您的代码可能只是将标志设置为O_NONBLOCK. 它应该获取当前标志,然后将其设置OldFlags | O_NONBLOCK为以设置非阻塞模式。将套接字设置为只读模式似乎可能会导致写入时出现问题。或者可能是您不小心使用F_GETFD而不是F_GETFL获取旧标志。

于 2010-03-28T20:38:40.343 回答
1

我无法从您的代码中判断(可能是因为我没有在正确的位置查看)在您从其中一个描述符中获取 SIGPIPE 后是否调整了 epoll 结构。从文件描述符中获得 SIGPIPE(或 EPIPE)后,您将在后续使用同一文件描述符时重复。您需要关闭文件描述符,然后适当地调整服务器的内部结构。

于 2010-03-28T22:27:40.320 回答