2

我正在研究一个多线程 UDP 侦听器,但我遇到了一个绝对超越我的问题。

所以,我需要在多个端口接收大量的 UDP 数据包。在本地,对我来说最好的解决方案是在与我正在监听的端口一样多的线程中调用非阻塞 recvfrom(选择和轮询对于我的要求来说太慢了)。我正在使用线程池管理器,它只是调用线程和队列任务。这是代码:

void receiveFromSocket(void * arguments){

    sockaddr_in client;  // Local
    socklen_t clientSize = sizeof(client);
    memset(&client, 0, sizeof(client));
    struct arg_struct_listenPort *args2 = (struct arg_struct_listenPort *)arguments;
        int fd = args2->arg_fd;
        int port = args2->arg_port;

    for(;;) {
        char buf[158];
        memset(buf,0,158*sizeof(char));
        int n = recvfrom(fd, (char * ) buf, 158, MSG_DONTWAIT, ( struct sockaddr *) &client, &clientSize);

            if(n == -1){
               //cerr << "Error while receiving from client: " << errno << endl;
               continue;
            }

            if(n != 158){
               cerr << "Discarded message since it's not 158 bytes." << endl;
               continue;
            }
            struct arg_struct args;
                args.arg_port = port;
                memcpy(args.buf,buf,158);

            thpool_add_work(globals.thpool, socketThread, (void*)(&args));

    }

}


/// Runs the Socket listener
int network_accept_any()
{
        vector<int>::iterator i;
        for(i = globals.fds.begin(); i != globals.fds.end(); i++){
            int port = distance(globals.fds.begin(),i);
            struct arg_struct_listenPort args;
                args.arg_fd = *i;
                args.arg_port = globals.cmnSystemCatalogs[port].diag_port;
            thpool_add_work(globals.thpool, receiveFromSocket, (void*)(&args));
        }
        cout << "Listening threads created..." << endl;

    return 0;
}

这在本地工作得很好。但是当我在生产环境中编译它时,一些端口监听数据包而其他端口根本不监听!并且工作端口在每次执行中都会发生变化。我可以,确认不是防火墙问题。我也可以通过 Wireshark 清楚地看到数据包。我可以通过 netcat 在这些端口上接收数据包。Netstat 显示所有打开的端口。

我的本地环境是Ubuntu 18.04 VM,生产环境是Debian 9.8。

这是我调用套接字的方式:

int lSocket(int port) {

    //Crear Socket
        int listening = socket(AF_INET, SOCK_DGRAM, 0);
        if (listening == -1) {
            cerr << "No se puede crear el socket";
            exit(EXIT_FAILURE);
        }

        //Enlazar socket a un IP / puerto

        struct sockaddr_in hint;
        memset(&hint, 0, sizeof(hint));
        hint.sin_family = AF_INET; //IPv4
        hint.sin_port = htons(port); //Port
        hint.sin_addr.s_addr = htonl(INADDR_ANY);

        if(bind(listening, (struct sockaddr*)&hint, sizeof(hint)) == -1) { //Enlaza las opciones definidas al socket
            cerr << "No se puede enlazar IP/puerto" << endl;
            exit(EXIT_FAILURE);
        }


        return listening;

}

非常感谢任何建议!

编辑:

正如建议的那样,我尝试切换到阻塞 I/O,但主要问题仍然存在。仍然没有收到所有打开的端口。

4

1 回答 1

1

多么了不起的欢迎!

@molbdnilo 绝对正确:

您正在使用指向生命周期已结束的对象的指针 (&args)。这具有未定义的行为 - 它可能看起来有效,但它是一个需要修复的错误。

这是固定代码。向线程提供参数时必须小心!

void receiveFromSocket(void * arguments){

    sockaddr_in client;  // Local
    socklen_t clientSize = sizeof(client);
    memset(&client, 0, sizeof(client));
    struct arg_struct_listenPort *args2 = (struct arg_struct_listenPort *)arguments;
        int fd = args2->arg_fd;
        int port = args2->arg_port;

    for(;;) {
        char buf[158];
        memset(buf,0,158*sizeof(char));
        int n = recvfrom(fd, (char * ) buf, 158, MSG_WAITALL, ( struct sockaddr *) &client, &clientSize);

            if(n == -1){
               cerr << "Error while receiving from client: " << errno << endl;
               continue;
            }

            if(n != 158){
               cerr << "Discarded message since it's not 158 bytes." << endl;
               continue;
            }
            arg_struct *args = new arg_struct;
                args->arg_port = port;
                memcpy(args->buf,buf,158);

            thpool_add_work(globals.thpool, socketThread, (void*)(args));

    }

}


/// Runs the Socket listener
int network_accept_any()
{
        vector<int>::iterator i;
        for(i = globals.fds.begin(); i != globals.fds.end(); i++){
            int port = distance(globals.fds.begin(),i);
          arg_struct_listenPort *args = new arg_struct_listenPort;
                args->arg_fd = *i;
                args->arg_port = globals.cmnSystemCatalogs[port].diag_port;
            thpool_add_work(globals.thpool, receiveFromSocket, (void*)(args));
        }
        cout << "Listening threads created..." << endl;


    return 0;
}

另外,我会留意@John Bollinger 和@Superlokkus 的评论。

谢谢你们!

于 2019-03-12T13:32:59.940 回答