4

我最近对内核事件进行了一些测试,结果如下:

  • 使用内核事件来接受套接字是否有意义?我的测试表明我一次只能处理一个接受(即使事件列表数组更大)(对我来说很有意义,因为 .ident == sockfd 仅适用于一个套接字)。

  • 我以为kevent的使用主要是一次从多个socket中读取。真的吗?

这是 TCP 服务器如何使用 kqueue 实现完成的吗?:


  • 监听线程(无 kqueue)
    • 接受新连接并将 FD 添加到 worker kqueue。 问题:这甚至可能吗?我的测试显示是的,但它是否保证工作线程会知道这些更改并且 kevent 真的是线程安全的?

  • 工作线程(带 kqueue)

    • 等待对从侦听线程添加的文件描述符的读取。

    问题:一次检查更新有多少个套接字有意义?


谢谢

4

2 回答 2

3

这不是一个真正的答案,但我制作了一个小服务器脚本来kqueue解释问题:

#include <stdio.h>          // fprintf
#include <sys/event.h>      // kqueue
#include <netdb.h>          // addrinfo
#include <arpa/inet.h>      // AF_INET
#include <sys/socket.h>     // socket
#include <assert.h>         // assert
#include <string.h>         // bzero
#include <stdbool.h>        // bool
#include <unistd.h>         // close

int main(int argc, const char * argv[])
{

    /* Initialize server socket */
    struct addrinfo hints, *res;
    int sockfd;

    bzero(&hints, sizeof(hints));
    hints.ai_family     = AF_INET;
    hints.ai_socktype   = SOCK_STREAM;

    assert(getaddrinfo("localhost", "9090", &hints, &res) == 0);

    sockfd = socket(AF_INET, SOCK_STREAM, res->ai_protocol);

    assert(sockfd > 0);

    {
        unsigned opt = 1;

        assert(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == 0);

        #ifdef SO_REUSEPORT
        assert(setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == 0);
        #endif
    }

    assert(bind(sockfd, res->ai_addr, res->ai_addrlen) == 0);

    freeaddrinfo(res);

    /* Start to listen */
    (void)listen(sockfd, 5);

    {
        /* kevent set */
        struct kevent kevSet;
        /* events */
        struct kevent events[20];
        /* nevents */
        unsigned nevents;
        /* kq */
        int kq;
        /* buffer */
        char buf[20];
        /* length */
        ssize_t readlen;

        kevSet.data     = 5;    // backlog is set to 5
        kevSet.fflags   = 0;
        kevSet.filter   = EVFILT_READ;
        kevSet.flags    = EV_ADD;
        kevSet.ident    = sockfd;
        kevSet.udata    = NULL;

        assert((kq = kqueue()) > 0);

        /* Update kqueue */
        assert(kevent(kq, &kevSet, 1, NULL, 0, NULL) == 0);

        /* Enter loop */
        while (true) {
            /* Wait for events to happen */
            nevents = kevent(kq, NULL, 0, events, 20, NULL);

            assert(nevents >= 0);

            fprintf(stderr, "Got %u events to handle...\n", nevents);

            for (unsigned i = 0; i < nevents; ++i) {
                struct kevent event = events[i];
                int clientfd        = (int)event.ident;

                /* Handle disconnect */
                if (event.flags & EV_EOF) {

                    /* Simply close socket */
                    close(clientfd);

                    fprintf(stderr, "A client has left the server...\n");
                } else if (clientfd == sockfd) {
                    int nclientfd = accept(sockfd, NULL, NULL);

                    assert(nclientfd > 0);

                    /* Add to event list */
                    kevSet.data     = 0;
                    kevSet.fflags   = 0;
                    kevSet.filter   = EVFILT_READ;
                    kevSet.flags    = EV_ADD;
                    kevSet.ident    = nclientfd;
                    kevSet.udata    = NULL;

                    assert(kevent(kq, &kevSet, 1, NULL, 0, NULL) == 0);

                    fprintf(stderr, "A new client connected to the server...\n");

                    (void)write(nclientfd, "Welcome to this server!\n", 24);
                } else if (event.flags & EVFILT_READ) {

                    /* sleep for "processing" time */
                    readlen = read(clientfd, buf, sizeof(buf));

                    buf[readlen - 1] = 0;

                    fprintf(stderr, "bytes %zu are available to read... %s \n", (size_t)event.data, buf);

                    sleep(4);
                } else {
                    fprintf(stderr, "unknown event: %8.8X\n", event.flags);
                }
            }
        }
    }

    return 0;
}

每次客户端发送内容时,服务器都会经历 4 秒的“滞后”。(我夸大了一点,但对于测试来说相当合理)。那么如何解决这个问题呢?我认为具有自己的工作线程(池)kqueue作为可能的解决方案,然后不会发生连接延迟。(每个工作线程读取某个“范围”的文件描述符)

于 2014-08-10T17:40:01.740 回答
3

通常,您使用 kqueue 作为线程的替代方案。如果你打算使用线程,你可以设置一个监听线程和一个工作线程池,每个接受的连接一个线程。这是一个更简单的编程模型。

在事件驱动的框架中,您会将侦听套接字和所有接受的套接字都放入 kqueue,然后在事件发生时处理它们。当您接受一个套接字时,您将它添加到 kqueue,当套接字处理程序完成它的工作时,它可以从 kqueue 中删除该套接字。(后者通常不是必需的,因为关闭 fd 会自动从任何 kqueue 中删除任何关联的事件。)

请注意,使用 kqueue 注册的每个事件都有一个void*用户数据,可用于在事件触发时识别所需的操作。所以没有必要每个事件队列都有一个唯一的事件处理程序;事实上,拥有各种处理程序是很常见的。(例如,您可能还想处理通过命名管道设置的控制通道。)

混合事件/线程模型当然是可能的;否则,您将无法利用多核 CPU。一种可能的策略是将事件队列用作生产者-消费者模型中的调度程序。队列处理程序将直接处理侦听套接字上的事件,接受连接并将接受的 fd 添加到事件队列中。当客户端连接事件发生时,该事件将被发布到工作队列中以供以后处理。也可以有多个工作队列,每个线程一个,并让接受者猜测新连接应该放在哪个工作队列中,大概是根据该线程的当前负载。

于 2014-08-10T16:40:08.050 回答