0

我正在构建一个服务器/客户端应用程序。服务器将需要处理少于 1000 个以上的客户端。我目前有一个简单的 EPOLL 实现,能够从客户端接收信息到服务器。但是,我还需要能够向特定客户发送任务。我的问题是有一种方法可以使用 EPOLL 来识别我还需要哪些客户端发送任务(可能使用 EPOLLOUT 标志)并将消息发送出去。我附上了我目前拥有的片段。如果可能的话,我将如何实现这一点。如果可以这样工作,那么能够为所有发送和接收提供一个 epoll 会很棒。

感谢您的任何帮助/建议!

void epoll(int listening_port)
{
    char buffer[500];       //buffer for message
    int listen_sock = 0;    //file descriptor (fd) for listening socket
    int conn_sock = 0;      //fd for connecting socket
    int epollfd = 0;         // fd for epoll
    int nfds = 0;           //number of fd's ready for i/o
    int i = 0;              //index to which file descriptor we are lookng at
    int curr_fd = 0;        //fd for socket we are currently looking at
    bool loop = 1;          //boolean value to help identify whether to keep in loop or not
    socklen_t address_len;
    struct sockaddr_in serv_addr;
    struct epoll_event ev, events[EPOLL_MAX_EVENTS];
    ssize_t result = 0;


    bzero(buffer, sizeof(buffer));
    bzero(&serv_addr, sizeof(serv_addr));

    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = listening_port;
    serv_addr.sin_addr.s_addr = INADDR_ANY;

    listen_sock = create_socket();


    if(bind(listen_sock, SA &serv_addr, sizeof(serv_addr)) != 0)
    {
        perror("Bind failed");
    }
    else
    {
        printf("Bind successful\n");
    }


    set_socket_nonblocking(listen_sock);

    listen_on_socket(listen_sock, SOMAXCONN); //specifying max connections in backlog

    epollfd = initialize_epoll();

    ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
    ev.data.fd = listen_sock;

    if(epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == ERROR)
    {
        perror("Epoll_ctl: listen sock");
    }
    else
    {
        printf("Successfully added listen socket to epoll\n");
    }

    while (RUN_EPOLL)
    {
        nfds = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, 0); //waiting for incoming connection;
        if(nfds == ERROR)
        {
            perror("EPOLL_Wait");
        }
        //printf("Finished waiting\i");

        for(i = 0; i < nfds; ++i)
        {
            curr_fd = events[i].data.fd;
            loop = true; //reset looping flag
            //Notification from Listening Socket - Process Incoming Connections

            if (curr_fd == listen_sock) {

                while(loop)
                {
                    conn_sock = accept(listen_sock, SA &serv_addr, &address_len); //accept incoming connection
                    printf("Accepted new incoming connection - socket fd: %d\n", conn_sock);
                    if (conn_sock > 0) //if successful set socket nonblocking and add it to epoll
                    {

                        set_socket_nonblocking(conn_sock);

                        ev.events = EPOLLIN | EPOLLOUT| EPOLLET | EPOLLRDHUP; //setting flags
                        ev.data.fd = conn_sock; //specify fd of new connection in event to follow
                        if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock, &ev) == ERROR) //add fd to monitored fd's
                        {
                            perror("epoll_ctl: conn_sck");
                        }
                        else
                        {
                            printf("Added %d to monitor list\n", conn_sock);
                        }


                    }
                    else if (conn_sock == ERROR)
                    {
                        if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
                        {
                            printf("All incoming connections processed\n");
                            loop = false;
                        }
                        else
                        {
                            perror("Accept remote socket");
                            loop = false;
                        }
                    }
                }
            }
            else if(events[i].events & EPOLLRDHUP) //detecting if peer shutdown
            {
                printf("Detected socket peer shutdown. Closing now. \n");

                if (epoll_ctl(epollfd, EPOLL_CTL_DEL, curr_fd, NULL) == ERROR) {
                    perror("epoll_ctl: conn_sck");
                }

                close_socket(curr_fd);
            }
            else if(events[i].events & EPOLLIN)
            {
                while(loop)
                {
                    result = recv(curr_fd, buffer, sizeof(buffer), 0);
                    //printf("Length of incoming message is %d\i", result);

                    if(result > 0) //
                    {
                        printf("File Descriptor: %d. Message: %s\n", curr_fd, buffer);
                        bzero(buffer, sizeof(buffer));
                    }
                    else if(result == ERROR) //Message is completely sent
                    {

                        if(errno == EAGAIN)
                        {


                            send_message(curr_fd, "Success", strlen("Success"));
                            loop = false;

                        }
                    }
                    else if(result == 0)
                    {
                        //Removing the fd from the monitored descriptors in epoll
                        if (epoll_ctl(epollfd, EPOLL_CTL_DEL, curr_fd, NULL) == ERROR) {
                            perror("epoll_ctl: conn_sck");
                        }
                        close_socket(curr_fd); //Closing the fd
                        loop = false;
                    }

                }

            }
        }

    }

    close_socket(listen_sock);
    //need to develop way to gracefully close out of epoll

    return;

}
4

0 回答 0