1

我正在比较 AF-XDP 套接字和 Linux 套接字,它们可以处理多少个数据包而不会丢失数据包(数据包丢失定义为当前数据包的 RTP 序列号不等于以前的数据包+ 1)。

我注意到我的 AF-XDP 套接字程序(我无法确定这个问题是否与内核程序或用户空间程序有关)~25每秒丢失大约数据390.000包,而具有通用 linux 套接字的等效程序不会丢失任何数据包。

我实现了一个所谓的distributor程序,它加载 XDP 内核程序一次,设置一个通用 linux 套接字,并setsockopt(IP_ADD_MEMBERSHIP)为我通过命令行传递给程序的每个多播地址添加到这个通用套接字。在此之后,distributor加载BPF_MAP_TYPE_HASH放置在 XDP 内核程序中的文件描述符并插入流量路由,以防单个 AF-XDP 套接字稍后需要共享其 umem。

然后,XDP 内核程序检查每个 IPv4/UDP 数据包是否在该哈希映射中存在条目。这基本上看起来像这样:

const struct pckt_idntfy_raw raw = {
    .src_ip = 0, /* not used at the moment */
    .dst_ip = iph->daddr,
    .dst_port = udh->dest,
    .pad = 0
};

const int *idx = bpf_map_lookup_elem(&xdp_packet_mapping, &raw);

if(idx != NULL) {
    if (bpf_map_lookup_elem(&xsks_map, idx)) {
        bpf_printk("Found socket @ index: %d!\n", *idx);
        return bpf_redirect_map(&xsks_map, *idx, 0);
    } else {
        bpf_printk("Didn't find connected socket for index %d!\n", *idx);
    }
}

如果idx存在,这意味着在BPF_MAP_TYPE_XSKMAP.

在完成所有这些之后,通过传递应该由该进程处理的所有多播地址(包括目标端口)distributor产生一个新进程(一个进程处理一个 RX 队列)。fork()如果没有足够的 RX-Queue,某些进程可能会收到多个多播地址。这意味着他们将使用SHARED UMEM.

我基本上将我的 AF-XDP 用户空间程序定位在这个示例代码上:https ://github.com/torvalds/linux/blob/master/samples/bpf/xdpsock_user.c

我正在使用相同的xsk_configure_umem,xsk_populate_fill_ringxsk_configure_socket功能。

因为我认为我不需要此应用程序的最大延迟,所以我将进程发送到睡眠指定时间(大约1 - 2ms),之后它循环通过每个 AF-XDP 套接字(大多数时候它只是一个套接字)并处理该套接字的每个接收到的数据包,验证没有丢失任何数据包:

while(!global_exit) {
    nanosleep(&spec, &remaining);

    for(int i = 0; i < cfg.ip_addrs_len; i++) {
        struct xsk_socket_info *socket = xsk_sockets[i];
        if(atomic_exchange(&socket->stats_sync.lock, 1) == 0) {
            handle_receive_packets(socket);
            atomic_fetch_xor(&socket->stats_sync.lock, 1); /* release socket-lock */
        }
    }
}

在我看来,这并没有什么太花哨的东西,但不知何故,即使我的 UMEM 接近 1GB 的 RAM,我也会在数据~25包周围丢失数据包。390.000

相比之下,我的通用 linux 套接字程序看起来像这样(简而言之):

int fd = socket(AF_INET, SOCK_RAW, IPPROTO_UDP);

/* setting some socket options */

struct sockaddr_in sin;
memset(&sin, 0, sizeof(struct sockaddr_in));
sin.sin_family = AF_INET;
sin.sin_port = cfg->ip_addrs[0]->pckt.dst_port;
inet_aton(cfg->ip_addrs[0]->pckt.dst_ip, &sin.sin_addr);

if(bind(fd, (struct sockaddr*)&sin, sizeof(struct sockaddr)) < 0) {
    fprintf(stderr, "Error on binding socket: %s\n", strerror(errno));
    return - 1;
}

ioctl(fd, SIOCGIFADDR, &intf);

-program 为每个给定的distributor多播 IP 创建一个新进程,以防使用通用 linux 套接字(因为在通用套接字中没有复杂的方法,例如 SHARED-UMEM,我不会为每个进程的多个多播流而烦恼)。后来我当然加入了多播成员:

struct ip_mreqn mreq;
memset(&mreq, 0, sizeof(struct ip_mreqn));

const char *multicast_ip = cfg->ip_addrs[0]->pckt.dst_ip;

if(inet_pton(AF_INET, multicast_ip, &mreq.imr_multiaddr.s_addr)) {
    /* Local interface address */
    memcpy(&mreq.imr_address, &cfg->ifaddr, sizeof(struct in_addr));
    mreq.imr_ifindex = cfg->ifindex;

    if(setsockopt(igmp_socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(struct ip_mreqn)) < 0) {
        fprintf(stderr, "Failed to set `IP_ADD_MEMBERSHIP`: %s\n", strerror(errno));
        return;
    } else {
        printf("Successfully added Membership for IP: %s\n", multicast_ip);
    }
}

并开始处理数据包(不是休眠,而是以busy-loop类似的方式):

void read_packets_recvmsg_with_latency(struct config *cfg, struct statistic *st, void *buff, const int igmp_socket_fd) {
    char ctrl[CMSG_SPACE(sizeof(struct timeval))];

    struct msghdr msg;
    struct iovec iov;
    msg.msg_control = (char*)ctrl;
    msg.msg_controllen = sizeof(ctrl);
    msg.msg_name = &cfg->ifaddr;
    msg.msg_namelen = sizeof(cfg->ifaddr);

    msg.msg_iov = &iov;
    msg.msg_iovlen = 1;
    iov.iov_base = buff;
    iov.iov_len = BUFFER_SIZE;

    struct timeval time_user, time_kernel;
    struct cmsghdr *cmsg = (struct cmsghdr*)&ctrl;

    const int64_t read_bytes = recvmsg(igmp_socket_fd, &msg, 0);
    if(read_bytes == -1) {
        return;
    }

    gettimeofday(&time_user, NULL);

    if(cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_TIMESTAMP) {
        memcpy(&time_kernel, CMSG_DATA(cmsg), sizeof(struct timeval));
    }

    if(verify_rtp(cfg, st, read_bytes, buff)) {
        const double timediff = (time_user.tv_sec - time_kernel.tv_sec) * 1000000 + (time_user.tv_usec - time_kernel.tv_usec);
        if(timediff > st->stats.latency_us) {
            st->stats.latency_us = timediff;
        }
    }
}



int main(...) {
    ....
    while(!is_global_exit) {
        read_packets_recvmsg_with_latency(&cfg, &st, buffer, igmp_socket_fd);
    }
}

差不多就是这样。

请不要在所描述的用例中开始丢失我不使用的数据包SHARED UMEM,它只是一个接收多播流的单个 RX-Queue。如果我处理较小的多播流,150.000 ppsAF-XDP 解决方案不会丢失任何数据包。但这也是相反的方式 - 因为520.000 pps在同一个 RX-Queue 上(使用SHARED UMEM)我失去了12.000 pps.

有什么我想念的想法吗?

4

0 回答 0