我正在比较 AF-XDP 套接字和 Linux 套接字,它们可以处理多少个数据包而不会丢失数据包(数据包丢失定义为当前数据包的 RTP 序列号不等于以前的数据包+ 1
)。
我注意到我的 AF-XDP 套接字程序(我无法确定这个问题是否与内核程序或用户空间程序有关)~25
每秒丢失大约数据390.000
包,而具有通用 linux 套接字的等效程序不会丢失任何数据包。
我实现了一个所谓的distributor
程序,它加载 XDP 内核程序一次,设置一个通用 linux 套接字,并setsockopt(IP_ADD_MEMBERSHIP)
为我通过命令行传递给程序的每个多播地址添加到这个通用套接字。在此之后,distributor
加载BPF_MAP_TYPE_HASH
放置在 XDP 内核程序中的文件描述符并插入流量路由,以防单个 AF-XDP 套接字稍后需要共享其 umem。
然后,XDP 内核程序检查每个 IPv4/UDP 数据包是否在该哈希映射中存在条目。这基本上看起来像这样:
const struct pckt_idntfy_raw raw = {
.src_ip = 0, /* not used at the moment */
.dst_ip = iph->daddr,
.dst_port = udh->dest,
.pad = 0
};
const int *idx = bpf_map_lookup_elem(&xdp_packet_mapping, &raw);
if(idx != NULL) {
if (bpf_map_lookup_elem(&xsks_map, idx)) {
bpf_printk("Found socket @ index: %d!\n", *idx);
return bpf_redirect_map(&xsks_map, *idx, 0);
} else {
bpf_printk("Didn't find connected socket for index %d!\n", *idx);
}
}
如果idx
存在,这意味着在BPF_MAP_TYPE_XSKMAP
.
在完成所有这些之后,通过传递应该由该进程处理的所有多播地址(包括目标端口)distributor
产生一个新进程(一个进程处理一个 RX 队列)。fork()
如果没有足够的 RX-Queue,某些进程可能会收到多个多播地址。这意味着他们将使用SHARED UMEM
.
我基本上将我的 AF-XDP 用户空间程序定位在这个示例代码上:https ://github.com/torvalds/linux/blob/master/samples/bpf/xdpsock_user.c
我正在使用相同的xsk_configure_umem
,xsk_populate_fill_ring
和xsk_configure_socket
功能。
因为我认为我不需要此应用程序的最大延迟,所以我将进程发送到睡眠指定时间(大约1 - 2ms
),之后它循环通过每个 AF-XDP 套接字(大多数时候它只是一个套接字)并处理该套接字的每个接收到的数据包,验证没有丢失任何数据包:
while(!global_exit) {
nanosleep(&spec, &remaining);
for(int i = 0; i < cfg.ip_addrs_len; i++) {
struct xsk_socket_info *socket = xsk_sockets[i];
if(atomic_exchange(&socket->stats_sync.lock, 1) == 0) {
handle_receive_packets(socket);
atomic_fetch_xor(&socket->stats_sync.lock, 1); /* release socket-lock */
}
}
}
在我看来,这并没有什么太花哨的东西,但不知何故,即使我的 UMEM 接近 1GB 的 RAM,我也会在数据~25
包周围丢失数据包。390.000
相比之下,我的通用 linux 套接字程序看起来像这样(简而言之):
int fd = socket(AF_INET, SOCK_RAW, IPPROTO_UDP);
/* setting some socket options */
struct sockaddr_in sin;
memset(&sin, 0, sizeof(struct sockaddr_in));
sin.sin_family = AF_INET;
sin.sin_port = cfg->ip_addrs[0]->pckt.dst_port;
inet_aton(cfg->ip_addrs[0]->pckt.dst_ip, &sin.sin_addr);
if(bind(fd, (struct sockaddr*)&sin, sizeof(struct sockaddr)) < 0) {
fprintf(stderr, "Error on binding socket: %s\n", strerror(errno));
return - 1;
}
ioctl(fd, SIOCGIFADDR, &intf);
-program 为每个给定的distributor
多播 IP 创建一个新进程,以防使用通用 linux 套接字(因为在通用套接字中没有复杂的方法,例如 SHARED-UMEM,我不会为每个进程的多个多播流而烦恼)。后来我当然加入了多播成员:
struct ip_mreqn mreq;
memset(&mreq, 0, sizeof(struct ip_mreqn));
const char *multicast_ip = cfg->ip_addrs[0]->pckt.dst_ip;
if(inet_pton(AF_INET, multicast_ip, &mreq.imr_multiaddr.s_addr)) {
/* Local interface address */
memcpy(&mreq.imr_address, &cfg->ifaddr, sizeof(struct in_addr));
mreq.imr_ifindex = cfg->ifindex;
if(setsockopt(igmp_socket_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(struct ip_mreqn)) < 0) {
fprintf(stderr, "Failed to set `IP_ADD_MEMBERSHIP`: %s\n", strerror(errno));
return;
} else {
printf("Successfully added Membership for IP: %s\n", multicast_ip);
}
}
并开始处理数据包(不是休眠,而是以busy-loop
类似的方式):
void read_packets_recvmsg_with_latency(struct config *cfg, struct statistic *st, void *buff, const int igmp_socket_fd) {
char ctrl[CMSG_SPACE(sizeof(struct timeval))];
struct msghdr msg;
struct iovec iov;
msg.msg_control = (char*)ctrl;
msg.msg_controllen = sizeof(ctrl);
msg.msg_name = &cfg->ifaddr;
msg.msg_namelen = sizeof(cfg->ifaddr);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
iov.iov_base = buff;
iov.iov_len = BUFFER_SIZE;
struct timeval time_user, time_kernel;
struct cmsghdr *cmsg = (struct cmsghdr*)&ctrl;
const int64_t read_bytes = recvmsg(igmp_socket_fd, &msg, 0);
if(read_bytes == -1) {
return;
}
gettimeofday(&time_user, NULL);
if(cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_TIMESTAMP) {
memcpy(&time_kernel, CMSG_DATA(cmsg), sizeof(struct timeval));
}
if(verify_rtp(cfg, st, read_bytes, buff)) {
const double timediff = (time_user.tv_sec - time_kernel.tv_sec) * 1000000 + (time_user.tv_usec - time_kernel.tv_usec);
if(timediff > st->stats.latency_us) {
st->stats.latency_us = timediff;
}
}
}
int main(...) {
....
while(!is_global_exit) {
read_packets_recvmsg_with_latency(&cfg, &st, buffer, igmp_socket_fd);
}
}
差不多就是这样。
请不要在所描述的用例中开始丢失我不使用的数据包SHARED UMEM
,它只是一个接收多播流的单个 RX-Queue。如果我处理较小的多播流,150.000 pps
AF-XDP 解决方案不会丢失任何数据包。但这也是相反的方式 - 因为520.000 pps
在同一个 RX-Queue 上(使用SHARED UMEM
)我失去了12.000 pps
.
有什么我想念的想法吗?