4

我正在尝试将使用 IOCP 的现有 Windows C++ 代码移植到 Linux。决定使用epoll_wait来实现高并发后,我已经面临一个理论上的问题,即我们何时尝试处理接收到的数据。

想象一下两个线程调用epoll_wait,并且接收到两条结果消息,这样 Linux 就会解除对第一个线程的阻塞,很快就会解除第二个线程的阻塞。

例子 :

Thread 1 blocks on epoll_wait
Thread 2 blocks on epoll_wait
Client sends a chunk of data 1
Thread 1 deblocks from epoll_wait, performs recv and tries to process data
Client sends a chunk of data 2
Thread 2 deblocks, performs recv and tries to process data.

这种情况可以想象吗?即它可以发生吗?

有没有办法防止它,以避免在接收/处理代码中实现同步?

4

5 回答 5

5

如果您有多个线程从同一组 epoll 句柄中读取,我建议您将 epoll 句柄置于一次性级别触发模式,使用EPOLLONESHOT. 这将确保在一个线程观察到触发的句柄之后,在您使用epoll_ctl重新武装句柄之前没有其他线程会观察到它。

如果需要独立处理读写路径,可能需要将读写线程池完全拆分;有一个用于读取事件的 epoll 句柄和一个用于写入事件的 epoll 句柄,并将线程独占分配给其中一个或另一个。此外,为读取和写入路径设置单独的锁。当然,就修改任何每个套接字的状态而言,您必须小心读取和写入线程之间的交互。

如果您确实采用这种拆分方法,则需要考虑如何处理套接字闭包。您很可能需要一个额外的共享数据锁和“确认关闭”标志,设置在共享数据锁下,用于读取和写入路径。然后读写线程可以竞相确认,最后一个确认的线程可以清理共享数据结构。也就是说,像这样:

void OnSocketClosed(shareddatastructure *pShared, int writer)
{
  epoll_ctl(myepollhandle, EPOLL_CTL_DEL, pShared->fd, NULL);
  LOCK(pShared->common_lock);
  if (writer)
    pShared->close_ack_w = true;
  else
    pShared->close_ack_r = true;

  bool acked = pShared->close_ack_w && pShared->close_ack_r;
  UNLOCK(pShared->common_lock);

  if (acked)
    free(pShared);
}
于 2011-04-04T17:31:58.160 回答
3

我在这里假设您尝试处理的情况是这样的:

您有多个(可能非常多)要同时接收数据的套接字;

您希望在第一次接收到来自线程 A 的第一个连接的数据时开始处理数据,然后确保在线程 A 中完成处理之前,不会在任何其他线程上处理来自该连接的数据。

当你这样做时,如果现在在不同的连接上接收到一些数据,你希望线程 B 选择该数据并处理它,同时仍然确保在线程 B 完成它之前没有其他人可以处理这个连接等。

在这些情况下,事实证明在多个线程中使用 epoll_wait() 和相同的 epoll fd 是一种相当有效的方法(我并不是说它一定是最有效的)。

这里的技巧是使用 EPOLLONESHOT 标志将单个连接 fds 添加到 epoll fd 中。这确保了一旦从 epoll_wait() 返回了 fd,它就不会受到监控,直到您明确告诉 epoll 再次对其进行监控。这确保处理此连接的线程不会受到干扰,因为在此线程标记要再次监视的连接之前,没有其他线程可以处理相同的连接。

您可以使用 epoll_ctl() 和 EPOLL_CTL_MOD 设置 fd 以再次监视 EPOLLIN 或 EPOLLOUT。

在多线程中使用像这样的 epoll 的一个显着好处是,当一个线程完成连接并将其添加回 epoll 监控集时,即使在前一个处理线程返回之前,仍在 epoll_wait() 中的任何其他线程都会立即监控它到 epoll_wait()。顺便说一句,如果另一个线程现在立即获取该连接,这也可能是一个缺点,因为缺少缓存数据局部性(因此需要获取该连接的数据结构并刷新前一个线程的缓存)。最有效的方法将敏感地取决于您的确切使用模式。

如果您尝试在不同线程中处理随后在同一连接上收到的消息,那么这种使用 epoll 的方案将不适合您,并且使用侦听线程提供有效队列提供工作线程的方法可能会更好。

于 2011-04-04T17:32:46.993 回答
2

以前的答案指出从多个线程调用 epoll_wait() 是一个坏主意,这几乎肯定是正确的,但我对这个问题很感兴趣,试图弄清楚从同一个句柄上的多个线程调用它时会发生什么,等待同一个套接字。我写了以下测试代码:

#include <netinet/in.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

struct thread_info {
  int number;
  int socket;
  int epoll;
};

void * thread(struct thread_info * arg)
{
    struct epoll_event events[10];
    int s;
    char buf[512];

    sleep(5 * arg->number);
    printf("Thread %d start\n", arg->number);

    do {
        s = epoll_wait(arg->epoll, events, 10, -1);

        if (s < 0) {
            perror("wait");
            exit(1);
        } else if (s == 0) {
            printf("Thread %d No data\n", arg->number);
            exit(1);
        }
        if (recv(arg->socket, buf, 512, 0) <= 0) {
            perror("recv");
            exit(1);
        }
        printf("Thread %d got data\n", arg->number);
    } while (s == 1);

    printf("Thread %d end\n", arg->number);

    return 0;
}

int main()
{
    pthread_attr_t attr;
    pthread_t threads[2];
    struct thread_info thread_data[2];
    int s;
    int listener, client, epollfd;
    struct sockaddr_in listen_address;
    struct sockaddr_storage client_address;
    socklen_t client_address_len;
    struct epoll_event ev;

    listener = socket(AF_INET, SOCK_STREAM, 0);

    if (listener < 0) {
        perror("socket");
        exit(1);
    }

    memset(&listen_address, 0, sizeof(struct sockaddr_in));
    listen_address.sin_family = AF_INET;
    listen_address.sin_addr.s_addr = INADDR_ANY;
    listen_address.sin_port = htons(6799);

    s = bind(listener,
             (struct sockaddr*)&listen_address,
             sizeof(listen_address));

    if (s != 0) {
        perror("bind");
        exit(1);
    }

    s = listen(listener, 1);

    if (s != 0) {
        perror("listen");
        exit(1);
    }

    client_address_len = sizeof(client_address);
    client = accept(listener,
                    (struct sockaddr*)&client_address,
                    &client_address_len);

    epollfd = epoll_create(10);
    if (epollfd == -1) {
        perror("epoll_create");
        exit(1);
    }

    ev.events = EPOLLIN;
    ev.data.fd = client;
    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, client, &ev) == -1) {
        perror("epoll_ctl: listen_sock");
        exit(1);
    }

    thread_data[0].number = 0;
    thread_data[1].number = 1;
    thread_data[0].socket = client;
    thread_data[1].socket = client;
    thread_data[0].epoll = epollfd;
    thread_data[1].epoll = epollfd;

    s = pthread_attr_init(&attr);
    if (s != 0) {
        perror("pthread_attr_init");
        exit(1);
    }

    s = pthread_create(&threads[0],
                       &attr,
                       (void*(*)(void*))&thread,
                       &thread_data[0]);

    if (s != 0) {
        perror("pthread_create");
        exit(1);
    }

    s = pthread_create(&threads[1],
                       &attr,
                       (void*(*)(void*))&thread,
                       &thread_data[1]);

    if (s != 0) {
        perror("pthread_create");
        exit(1);
    }

    pthread_join(threads[0], 0);
    pthread_join(threads[1], 0);

    return 0;
}

当数据到达,并且两个线程都在等待 epoll_wait() 时,只有一个会返回,但是随着后续数据的到达,唤醒处理数据的线程实际上在两个线程之间是随机的。我无法找到影响唤醒哪个线程的方法。

似乎调用 epoll_wait 的单个线程最有意义,将事件传递给工作线程以泵送 IO。

于 2011-04-04T20:51:46.807 回答
1

我相信使用 epoll 和每个核心线程的高性能软件会创建多个 epoll 句柄,每个句柄都处理所有连接的子集。这样分工,但避免了你描述的问题。

于 2011-04-04T16:50:54.723 回答
0

通常,epoll当您有一个线程在单个异步源上侦听数据时使用。为避免忙于等待(手动轮询),您可以epoll在数据准备好时通知您(很像select)。

从单个数据源读取多个线程不是标准做法,至少我认为这是不好的做法。

如果您想使用多个线程,但您只有一个输入源,则指定其中一个线程来侦听数据并将数据排队,以便其他线程可以从队列中读取单个数据。

于 2011-04-04T16:47:51.950 回答