2

我有一个有两个线程的进程。一个线程,线程 A,将设置 timerfd 计时器,而另一个线程,线程 B,将对这些计时器执行“选择”。一旦计时器到期,线程 B 将向线程 A 指示这一点。

要添加计时器,线程 A 将创建一个新计时器,然后它会唤醒线程 B 以将此计时器包含在其对 select 的调用中。我正试图通过使用文件描述符来唤醒线程 B。然后,线程 B 将在该 FD 上调用 select,并且所有 FD 都通过对 timerfd 的调用返回。

问题是我无法设法创建一个我可以控制的 FD,以便它会在我想要的时候导致 select 阻塞或返回。

我尝试将 shm_open 与 fcntl 调用一起使用,并且我尝试仅在文件上使用 open,但它们都不会导致 select 阻塞。我所有的尝试都会导致 select 立即返回。有什么方法可以创建一个会导致 select 阻塞的 FD,直到我以某种方式更新该 FD?

尝试 1 - 使用 shm_open 创建一个 FD 并使用 fcntl 设置读锁:

从线程 A 创建 FD。

if((wakeUpFd = shm_open("/wakeup", O_RDWR|O_CREAT|O_TRUNC, 0)) == -1)
   printf("Failed to open /wakeup, Errno = %d\n", errno);
else
{
   fcntl(wakeUpFd, F_SETLK, F_RDLCK);
}

从线程 A 添加计时器。

#create a timer and add it to a list
/* wake up timer thread */
fcntl(wakeUpFd, F_SETLK, ~F_RDLCK);

唤醒线程 B

#when select returns
if(FD_ISSET(wakeUpFd, &timerSet))
{
   fcntl(wakeUpFd, F_SETLK, F_RDLCK);
}
#check all other timer FD's

尝试 2 - 使用 shm_open 并向其读取/写入数据:

从线程 A 创建 FD。

if((wakeUpFd = shm_open("/wakeup", O_RDWR|O_CREAT|O_TRUNC, 0)) == -1)
   printf("Failed to open /wakeup, Errno = %d\n", errno);
else
{
   if(ftruncate(wakeUpFd, 2) == -1)
   {
      printf("Failed with ftruncate, Errno = %d\n", errno);
   }
}

从线程 A 添加计时器。

#create a timer and add it to a list
/* wake up timer thread */
if(write(wakeUpFd, wakeUpStr, 1) != 1)
   printf("Failed to write to wakeUpFd\n");

唤醒线程 B

#when select returns
if(FD_ISSET(wakeUpFd, &timerSet))
{
   read(wakeUpFd, wakeUpBuf, 10);
}
#check all other timer FD's

Try 3 - 与 Try 2 几乎相同,但使用 open 而不是 shm_open。

尝试 4 - 与尝试 1 相同,但使用 fcntl(wakeUpFd, F_SETFL, ~O_NONBLOCK) 而不是 fcntl(wakeUpFd, F_SETLK, ~F_RDLCK)

4

2 回答 2

4

阅读select()规范,尤其是它说的那一点:

与常规文件关联的文件描述符应始终为准备读取、准备写入和错误条件选择 true。

您不能select()阻止常规文件的文件描述符。你必须有一个管道,或套接字,或类似的东西,作为你所在的文件select()

于 2013-06-05T16:09:27.163 回答
1

使用 Unix 域套接字对,例如socketpair(AF_UNIX, SOCK_DGRAM, 0, commsd)用于通信。当线程 A 创建一个新的 timerfd 时,它只是将新的描述符 an 写入int通信套接字commsd[0]

当线程 B 注意到通信套接字commsd[1]是可读的时,它会从中读取一个或多个ints。每个int显然都是一个新的计时器描述符,因此线程 B 必须将每个描述符添加到它所select()在的集合中。

在线程 B 中,我建议使用非阻塞读取,bytes = recv(commfd, ptr, len, MSG_DONTWAIT)在循环中从通信套接字读取:

    char     buffer[8 * sizeof(int)];
    size_t   head = 0, tail = 0;
    ssize_t  bytes;
    int      new_timerfd;

    while (1) {

        if (head >= tail)
            head = tail = 0;
        else
        if (head > 0) {
            memmove(buffer, buffer + head, tail - head);
            tail -= head;
            head = 0;
        }

        if (tail < sizeof buffer) {
            bytes = recv(commsd[1], buffer + head, sizeof buffer - tail, MSG_DONTWAIT);
            if (bytes > (ssize_t)0)
                head += bytes;
        }

        if (head >= tail)
            break;

        while (head + sizeof (int) <= tail) {
            /* Unaligned version of new_timerfd = *(int *)(buffer + head); */
            memmove(&new_timerfd, buffer + head, sizeof new_timerfd);
            head += sizeof (int);

            /*
             * Add new_timerfd to select()ed set
            */
        }
    }

上面的循环确实做了一个额外的非阻塞recv()调用来检测它已经读取了所有立即挂起的内容,但是这种方式非常健壮。它甚至不假设您总是可以阅读完整的int. (因为_POSIX_PIPE_BUF总是 的倍数sizeof int,你可以假设你总是读完整int的。)

只要有可用的数据,上述循环就会从套接字进行非阻塞接收,并且循环体会new_timerfd一一提取描述符。我省略了将其添加到select()ed 集的代码。

最后,这种方法也适用于一般情况,当您将较大的结构传输到select()循环中的线程时。只要确保缓冲区足够大,至少可以容纳两个结构,就可以了;s 处理您可能遇到的memmove()任何缓冲区打包和结构对齐问题。

于 2013-06-05T17:54:31.840 回答