我们在 Centos 6 上有一个应用程序,它调用recvmmsg()
多播地址一次读取 1024 个 UDP 数据包。当我们在同一个盒子上运行这个应用程序的多个实例(都监听相同的流量)时,有时这个调用会阻塞几秒钟,尽管套接字是非阻塞的,并且传入了MSG_DONTWAIT
. 它在所有其他情况下都可以正常工作,但会在高负载 (50MB/s) 下冻结。当应用程序阻塞时,我们落后于 UDP 流量并且无法恢复。该进程使用 RR 调度程序作为高优先级运行,以避免来自其他进程的干扰。我们也尝试切换到for 循环recvfrom()
并recv()
获得相同的结果。
我们可以在内核源代码中看到的唯一可以阻止它的是spin_lock_irqsave()
在__skb_try_recv_datagram()
. 但我不知道在什么情况下会出现问题,或者如何处理它以防止阻塞,或者这是否真的是问题。
我不确定下一步该往哪里看,所以任何指针都将不胜感激。
创建了一个非常简单的程序,可以在我们看到它的一个服务器上复制它(没有粘贴接口检索功能,但这不应该在这里相关,如果你需要它,请告诉我)。
recv() 示例:
int main(){
int fd = socket(AF_INET,SOCK_DGRAM,0);
int flags = fcntl(fd,F_GETFL,0);
fcntl(fd,F_SETFL, flags | O_NONBLOCK);
int reuse = 1;
setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&reuse,sizeof(reuse));
struct sockaddr_in sockaddr;
sockaddr.sin_port = htons(4755);
sockaddr.sin_family = AF_INET;
sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(fd,(struct sockaddr*)&sockaddr,sizeof(sockaddr)) < 0){
printf("Failed to bind.\n");
return 1;
}
in_addr_t interface;
if(!getInterface("192.168.15.255",&interface)){
printf("Failed to get interface.\n");
return 1;
}
struct ip_mreq imr;
memset(&imr,0,sizeof(imr));
imr.imr_multiaddr.s_addr = inet_addr("239.255.61.255");
imr.imr_interface.s_addr = interface;
if(!IN_MULTICAST(htonl(imr.imr_multiaddr.s_addr))){
printf("Group not in multicast.");
return 1;
}
if(setsockopt(fd,IPPROTO_IP,IP_ADD_MEMBERSHIP, (char*)&imr, sizeof(imr)) < 0){
printf("Failed to add membership, errno: %d.\n",errno);
return 1;
}
int epollInstance = epoll_create1(0);
struct epoll_event* epollEvents = (struct epoll_event*)malloc(sizeof(struct epoll_event)*8192);
epollEvents[0].events = EPOLLIN;
epoll_ctl(epollInstance,EPOLL_CTL_ADD,fd,&epollEvents[0]);
const int PACKETS_TO_READ = 1024;
static char receiveBuffer[PACKETS_TO_READ][USHRT_MAX];
static struct iovec iovecs[PACKETS_TO_READ];
static struct mmsghdr msgs[PACKETS_TO_READ];
static struct sockaddr_in sockFrom[PACKETS_TO_READ];
for (int i = 0; i < PACKETS_TO_READ; i++) {
iovecs[i].iov_base = receiveBuffer[i];
iovecs[i].iov_len = USHRT_MAX;
msgs[i].msg_hdr.msg_iov = &iovecs[i];
msgs[i].msg_hdr.msg_iovlen = 1;
msgs[i].msg_hdr.msg_name = &sockFrom[i];
msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in );
}
struct timeval start;
struct timeval end;
while(1){
int selected = epoll_wait(epollInstance,epollEvents,8192,10);
if(selected > 0){
gettimeofday(&start,NULL);
// uncomment this line and comment out the below for loop to switch to recvmmsg, both show the issue
// int numPackets = recvmmsg(fd,msgs,PACKETS_TO_READ,MSG_DONTWAIT,0);
int numPackets = 0;
for(int i = 0; i < PACKETS_TO_READ; i++){
int result = recv(fd,receiveBuffer[0],USHRT_MAX,MSG_DONTWAIT);
if(result == EAGAIN) break;
numPackets++;
}
gettimeofday(&end,NULL);
printf("Got %d packets in %lu microseconds\n",numPackets, (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec);
}
}
}
recvmmsg() 示例:
int main(){
int fd = socket(AF_INET,SOCK_DGRAM,0);
int flags = fcntl(fd,F_GETFL,0);
fcntl(fd,F_SETFL, flags | O_NONBLOCK);
int reuse = 1;
setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&reuse,sizeof(reuse));
struct sockaddr_in sockaddr;
sockaddr.sin_port = htons(4755);
sockaddr.sin_family = AF_INET;
sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(fd,(struct sockaddr*)&sockaddr,sizeof(sockaddr)) < 0){
printf("Failed to bind.\n");
return 1;
}
in_addr_t interface;
if(!getInterface("192.168.15.255",&interface)){
printf("Failed to get interface.\n");
return 1;
}
struct ip_mreq imr;
memset(&imr,0,sizeof(imr));
imr.imr_multiaddr.s_addr = inet_addr("239.255.61.255");
imr.imr_interface.s_addr = interface;
if(!IN_MULTICAST(htonl(imr.imr_multiaddr.s_addr))){
printf("Group not in multicast.");
return 1;
}
if(setsockopt(fd,IPPROTO_IP,IP_ADD_MEMBERSHIP, (char*)&imr, sizeof(imr)) < 0){
printf("Failed to add membership, errno: %d.\n",errno);
return 1;
}
int epollInstance = epoll_create1(0);
struct epoll_event* epollEvents = (struct epoll_event*)malloc(sizeof(struct epoll_event)*8192);
epollEvents[0].events = EPOLLIN;
epoll_ctl(epollInstance,EPOLL_CTL_ADD,fd,&epollEvents[0]);
const int PACKETS_TO_READ = 1024;
static char receiveBuffer[PACKETS_TO_READ][USHRT_MAX];
static struct iovec iovecs[PACKETS_TO_READ];
static struct mmsghdr msgs[PACKETS_TO_READ];
static struct sockaddr_in sockFrom[PACKETS_TO_READ];
for (int i = 0; i < PACKETS_TO_READ; i++) {
iovecs[i].iov_base = receiveBuffer[i];
iovecs[i].iov_len = USHRT_MAX;
msgs[i].msg_hdr.msg_iov = &iovecs[i];
msgs[i].msg_hdr.msg_iovlen = 1;
msgs[i].msg_hdr.msg_name = &sockFrom[i];
msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in );
}
struct timeval start;
struct timeval end;
while(1){
int selected = epoll_wait(epollInstance,epollEvents,8192,10);
if(selected > 0){
gettimeofday(&start,NULL);
// uncomment this line and comment out the below for loop to switch to recvmmsg, both show the issue
int numPackets = recvmmsg(fd,msgs,PACKETS_TO_READ,MSG_DONTWAIT,0);
gettimeofday(&end,NULL);
printf("Got %d packets in %lu microseconds\n",numPackets, (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec);
}
}
}