1

Linux select() 调用中继事件排序有什么方法吗?

我所看到的描述:

在一台机器上,我编写了一个简单的程序,它发送三个多播数据包,一个到三个不同的多播组中的每一个。这些数据包是背靠背发送的,中间没有延迟。即发送到(mcast_group1);发送到(mcast_group2);发送到(mcast_group3)。

在另一台机器上,我有一个接收程序。该程序为每个多播组使用一个套接字。每个套接字执行一个 bind() 和 IP_ADD_MEMBERSHIP(即加入/订阅)到它所侦听的地址。然后程序在三个套接字上执行 select()。

当 select 返回时,所有三个套接字都可供读取。但是哪个先来?待读套接字列表是一个集合,因此没有顺序。我想要的是如果 select() 每个接收到的数据包只返回一次,按顺序(增加的开销在这里是可以接受的)。或者,我可以使用其他某种机制来确定数据包接收顺序吗?

附加信息:

  • 操作系统是 x86_64 上的 CentOS 5(实际上是 Redhat Enterprise Linux)
  • 网卡硬件是 Intel 82571EB
  • 我已经尝试过 e1000e 驱动程序版本 1.3.10-k2 和 2.1.4-NAPI
  • 我尝试将 NIC 的中断固定到未加载和隔离的 CPU 内核
  • 我通过设置驱动程序选项 InterruptThrottleRate=0 禁用硬件 IRQ 合并,并通过 ethtool 设置 rx-usecs=0
  • 我也尝试使用 epoll,它具有相同的行为

最后一点:如果我只使用一个套接字,则会保留数据包顺序。在这种情况下,我绑定到 INADDR_ANY (0.0.0.0) 并在同一个套接字上多次执行 IP_ADD_MEMBERSHIP。但这对我们的应用程序不起作用,因为我们需要通过绑定到实际多播地址来提供过滤。最终,同一台机器上会有多个多播接收程序,订阅集可能相互交叉。所以也许另一种解决方案是找到另一种方法来实现bind()的过滤效果,但没有bind()。

4

3 回答 3

1

您可以使用IP_PKTINFO来获取数据包发送到的多播组的地址 - 即使套接字订阅了一堆多播组。有了这个,您将按顺序获取数据包并能够按组地址进行过滤。请参见下面的示例:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <ctype.h>
#include <errno.h>

#define PORT 1234
#define PPANIC(msg) perror(msg); exit(1);
#define STATS_PATCH 0

int main(int argc, char **argv)
{
    fd_set master;
    fd_set read_fds;
    struct sockaddr_in serveraddr;
    int sock;
    int opt = 1;
    size_t i;
    int rc;

    char *mcast_groups[] = {
        "226.0.0.1",
        "226.0.0.2",
        NULL
    };
#if STATS_PATCH 
    struct stat stat_buf;
#endif  

    struct ip_mreq imreq;

    FD_ZERO(&master);
    FD_ZERO(&read_fds);

    rc = sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
    if(rc == -1)
    {
        PPANIC("socket() failed");
    }

    rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    if(rc == -1)
    {
        PPANIC("setsockopt(reuse) failed");
    }

    memset(&serveraddr, 0, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_port = htons(PORT);
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

    rc = bind(sock, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
    if(rc == -1)
    {
        PPANIC("bind() failed");
    }

    rc = setsockopt(sock, IPPROTO_IP, IP_PKTINFO, &opt, sizeof(opt));
    if(rc == -1)
    {
        PPANIC("setsockopt(IP_PKTINFO) failed");
    }

    for (i = 0; mcast_groups[i] != NULL; i++)
    {
        imreq.imr_multiaddr.s_addr = inet_addr(mcast_groups[i]);
        imreq.imr_interface.s_addr = INADDR_ANY;
        rc = setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&imreq, sizeof(struct ip_mreq));
        if (rc != 0)
        {
            PPANIC("joing mcast group failed");
        }
    }

    FD_SET(sock, &master);

    while(1)
    {
        read_fds = master;
        rc = select(sock + 1, &read_fds, NULL, NULL, NULL);

        if (rc == 0)
        {
            continue;
        }

        if(rc == -1)
        {
            PPANIC("select() failed");
        }

        if(FD_ISSET(sock, &read_fds))
        {
            char buf[1024];
            int inb;
            char ctrl_msg_buf[1024];
            struct iovec iov[1];
            iov[0].iov_base = buf;
            iov[0].iov_len = 1024;
            struct msghdr msg_hdr = {
                .msg_iov = iov,
                .msg_iovlen = 1,
                .msg_name = NULL,
                .msg_namelen = 0,
                .msg_control = ctrl_msg_buf,
                .msg_controllen = sizeof(ctrl_msg_buf),
            };
            struct cmsghdr *ctrl_msg_hdr;

            inb = recvmsg(sock, &msg_hdr, 0);
            if (inb < 0)
            {
                PPANIC("recvmsg() failed");
            }

            for (ctrl_msg_hdr = CMSG_FIRSTHDR(&msg_hdr); ctrl_msg_hdr != NULL; ctrl_msg_hdr = CMSG_NXTHDR(&msg_hdr, ctrl_msg_hdr))
            {
                if (ctrl_msg_hdr->cmsg_level == IPPROTO_IP && ctrl_msg_hdr->cmsg_type == IP_PKTINFO)
                {
                    struct in_pktinfo *pckt_info = (struct in_pktinfo *)CMSG_DATA(ctrl_msg_hdr);
                    printf("got data for mcast group: %s\n", inet_ntoa(pckt_info->ipi_addr));
                    break;
                }
            }

            printf("|");
            for (i = 0; i < inb; i++)
                printf("%c", isprint(buf[i])?buf[i]:'?');
            printf("|\n");
#if STATS_PATCH
            rc = fstat(sock, &stat_buf);
            if (rc == -1)
            {
                perror("fstat() failed");
            } else {
                printf("st_atime: %d\n", stat_buf.st_atime);
                printf("st_mtime: %d\n", stat_buf.st_mtime);
                printf("st_ctime: %d\n", stat_buf.st_ctime);
            }
#endif
        }
    }

    return 0;
}

下面的代码不会解决 OPs 问题,但可以指导处理类似需求的人

(编辑)一个人不应该在深夜做这样的事情......即使使用该解决方案,您也只会获得 select 处理 fd 的顺序 - 这不会给您任何有关帧到达时间的指示。

如此处所述目前无法检索套接字的顺序或它们更改的时间戳,因为未为套接字 inode 设置所需的回调。但是如果你能够修补你的内核,你可以通过在 select 系统调用中设置时间来解决这个问题。

以下补丁可能会给您一个想法:

diff --git a/fs/select.c b/fs/select.c
index 467bb1c..3f2927e 100644
--- a/fs/select.c
+++ b/fs/select.c
@@ -435,6 +435,9 @@ int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
        for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
            unsigned long in, out, ex, all_bits, bit = 1, mask, j;
            unsigned long res_in = 0, res_out = 0, res_ex = 0;
+           struct timeval tv;
+           
+           do_gettimeofday(&tv);

            in = *inp++; out = *outp++; ex = *exp++;
            all_bits = in | out | ex;
@@ -452,6 +455,16 @@ int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
                f = fdget(i);
                if (f.file) {
                    const struct file_operations *f_op;
+                   struct kstat stat;
+                   
+                   int ret;
+                   u8 is_sock = 0;
+
+                   ret = vfs_getattr(&f.file->f_path, &stat);
+                   if(ret == 0 && S_ISSOCK(stat.mode)) {
+                       is_sock = 1;
+                   }
+                   
                    f_op = f.file->f_op;
                    mask = DEFAULT_POLLMASK;
                    if (f_op->poll) {
@@ -464,16 +477,22 @@ int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
                        res_in |= bit;
                        retval++;
                        wait->_qproc = NULL;
+                       if(is_sock && f.file->f_inode)
+                           f.file->f_inode->i_ctime.tv_sec = tv.tv_sec;
                    }
                    if ((mask & POLLOUT_SET) && (out & bit)) {
                        res_out |= bit;
                        retval++;
                        wait->_qproc = NULL;
+                       if(is_sock && f.file->f_inode)
+                           f.file->f_inode->i_ctime.tv_sec = tv.tv_sec;
                    }
                    if ((mask & POLLEX_SET) && (ex & bit)) {
                        res_ex |= bit;
                        retval++;
                        wait->_qproc = NULL;
+                       if(is_sock && f.file->f_inode)
+                           f.file->f_inode->i_ctime.tv_sec = tv.tv_sec;
                    }
                    /* got something, stop busy polling */
                    if (retval) {

笔记:

  1. 这是......只为你:) - 不要指望它出现在主线中

  2. 在测试每个相关 fd之前调用 do_gettimeofday() 。为了获得更高的粒度,这应该在每次迭代中完成(并且仅在需要时)。由于 stat-interface 仅提供一秒的粒度,您可以(!UGLY!)使用剩余时间属性将秒的小数部分映射到这些字段。

  3. 这是使用内核 3.16.0 完成的,并且没有经过很好的测试。不要在太空船或医疗设备中使用它。如果您想尝试一下,请获取文件系统映像(例如https://people.debian.org/~aurel32/qemu/amd64/debian_wheezy_amd64_standard.qcow2)并使用 qemu 对其进行测试:

    sudo qemu-system-x86_64 -kernel arch/x86/boot/bzImage -hda debian_wheezy_amd64_standard.qcow2 -append "root=/dev/sda1"

于 2016-02-06T02:01:51.260 回答
0

如果 select() 返回 > 1,则事件必须非常接近,以至于使排序问题变得毫无意义。

于 2013-07-19T22:56:54.197 回答
0

您可以使用 fstat 获取文件描述符准备就绪的时间戳。

欲了解更多信息,请阅读http://pubs.opengroup.org/onlinepubs/009695399/functions/fstat.html

于 2016-02-04T12:29:10.667 回答