6

我正在做一些套接字编程实验(在 unix 环境中)。我正在尝试的是

  1. 客户端向服务器发送请求。
  2. 服务器应将客户端套接字发送给 Worker(一个独立进程)
  3. 工人应该回复客户。

这可能吗?

如果 Worker 是 Server 的子级,则此方案适用。

如果 Server 和 Worker 是独立的进程,这行得通吗?如果是的话,有人可以给我一些想法吗?是否有可用于此类场景的示例?

4

3 回答 3

12

Linux Programming Interface这本书提供了使用 Unix 域套接字在不相关进程之间发送接收文件描述符的示例。

为了好玩,我从头开始编写了自己的示例。server.c

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netdb.h>
#include <signal.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>

/* How many concurrent pending connections are allowed */
#define  LISTEN_BACKLOG     32

/* Unix domain socket path length (including NUL byte) */
#ifndef  UNIX_PATH_LEN
#define  UNIX_PATH_LEN    108
#endif

/* Flag to indicate we have received a shutdown request. */
volatile sig_atomic_t     done = 0;

/* Shutdown request signal handler, of the basic type. */
void handle_done_signal(int signum)
{
    if (!done)
        done = signum;

    return;
}

/* Install shutdown request signal handler on signal signum. */
int set_done_signal(const int signum)
{
    struct sigaction act;

    sigemptyset(&act.sa_mask);
    act.sa_handler = handle_done_signal;
    act.sa_flags = 0;

    if (sigaction(signum, &act, NULL) == -1)
        return errno;
    else
        return 0;
}

/* Return empty, -, and * as NULL, so users can use that
 * to bind the server to the wildcard address.
*/
char *wildcard(char *address)
{
    /* NULL? */
    if (!address)
        return NULL;

    /* Empty? */
    if (!address[0])
        return NULL;

    /* - or ? or * or : */
    if (address[0] == '-' || address[0] == '?' ||
        address[0] == '*' || address[0] == ':')
        return NULL;

    return address;
}


int main(int argc, char *argv[])
{
    struct addrinfo         hints;
    struct addrinfo        *list, *curr;

    int             listenfd, failure;

    struct sockaddr_un     worker;
    int             workerfd, workerpathlen;

    struct sockaddr_in6     conn;
    socklen_t         connlen;
    struct msghdr         connhdr;
    struct iovec         conniov;
    struct cmsghdr        *connmsg;
    char             conndata[1];
    char             connbuf[CMSG_SPACE(sizeof (int))];
    int             connfd;

    int             result;
    ssize_t             written;

    if (argc != 4) {
        fprintf(stderr, "\n");
        fprintf(stderr, "Usage: %s ADDRESS PORT WORKER\n", argv[0]);
        fprintf(stderr, "This creates a server that binds to ADDRESS and PORT,\n");
        fprintf(stderr, "and passes each connection to a separate unrelated\n");
        fprintf(stderr, "process using an Unix domain socket at WORKER.\n");
        fprintf(stderr, "\n");
        return (argc == 1) ? 0 : 1;
    }

    /* Handle HUP, INT, PIPE, and TERM signals,
     * so when the user presses Ctrl-C, the worker process cannot be contacted,
     * or the user sends a HUP or TERM signal, this server closes down cleanly. */
    if (set_done_signal(SIGINT) ||
        set_done_signal(SIGHUP) ||
        set_done_signal(SIGPIPE) ||
        set_done_signal(SIGTERM)) {
        fprintf(stderr, "Error: Cannot install signal handlers.\n");
        return 1;
    }

    /* Unix domain socket to the worker */
    memset(&worker, 0, sizeof worker);
    worker.sun_family = AF_UNIX;

    workerpathlen = strlen(argv[3]);
    if (workerpathlen < 1) {
        fprintf(stderr, "Worker Unix domain socket path cannot be empty.\n");
        return 1;
    } else
    if (workerpathlen >= UNIX_PATH_LEN) {
        fprintf(stderr, "%s: Worker Unix domain socket path is too long.\n", argv[3]);
        return 1;
    }

    memcpy(&worker.sun_path, argv[3], workerpathlen);
    /* Note: the terminating NUL byte was set by memset(&worker, 0, sizeof worker) above. */

    workerfd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (workerfd == -1) {
        fprintf(stderr, "Cannot create an Unix domain socket: %s.\n", strerror(errno));
        return 1;
    }
    if (connect(workerfd, (const struct sockaddr *)(&worker), (socklen_t)sizeof worker) == -1) {
        fprintf(stderr, "Cannot connect to %s: %s.\n", argv[3], strerror(errno));
        close(workerfd);
        return 1;
    }

    /* Initialize the address info hints */
    memset(&hints, 0, sizeof hints);
    hints.ai_family = AF_UNSPEC;        /* IPv4 or IPv6 */
    hints.ai_socktype = SOCK_STREAM;    /* Stream socket */
    hints.ai_flags = AI_PASSIVE        /* Wildcard ADDRESS */
                   | AI_ADDRCONFIG          /* Only return IPv4/IPv6 if available locally */
                   | AI_NUMERICSERV        /* Port must be a number */
                   ;
    hints.ai_protocol = 0;            /* Any protocol */

    /* Obtain the chain of possible addresses and ports to bind to */
    result = getaddrinfo(wildcard(argv[1]), argv[2], &hints, &list);
    if (result) {
        fprintf(stderr, "%s %s: %s.\n", argv[1], argv[2], gai_strerror(result));
        close(workerfd);
        return 1;
    }

    /* Bind to the first working entry in the chain */
    listenfd = -1;
    failure = EINVAL;
    for (curr = list; curr != NULL; curr = curr->ai_next) {
        listenfd = socket(curr->ai_family, curr->ai_socktype, curr->ai_protocol);
        if (listenfd == -1)
            continue;

        if (bind(listenfd, curr->ai_addr, curr->ai_addrlen) == -1) {
            if (!failure)
                failure = errno;
            close(listenfd);
            listenfd = -1;
            continue;
        }

        /* Bind successfully */
        break;
    }

    /* Discard the chain, as we don't need it anymore.
     * Note: curr is no longer valid after this. */
    freeaddrinfo(list);

    /* Failed to bind? */
    if (listenfd == -1) {
        fprintf(stderr, "Cannot bind to %s port %s: %s.\n", argv[1], argv[2], strerror(failure));
        close(workerfd);
        return 1;
    }

    if (listen(listenfd, LISTEN_BACKLOG) == -1) {
        fprintf(stderr, "Cannot listen for incoming connections to %s port %s: %s.\n", argv[1], argv[2], strerror(errno));
        close(listenfd);
        close(workerfd);
        return 1;
    }

    printf("Now waiting for incoming connections to %s port %s\n", argv[1], argv[2]);
    fflush(stdout);

    while (!done) {

        memset(&conn, 0, sizeof conn);
        connlen = sizeof conn;

        connfd = accept(listenfd, (struct sockaddr *)&conn, &connlen);
        if (connfd == -1) {

            /* Did we just receive a signal? */
            if (errno == EINTR)
                continue;

            /* Report a connection failure. */
            printf("Failed to accept a connection: %s\n", strerror(errno));
            fflush(stdout);

            continue;
        }

        /* Construct the message to the worker process. */
        memset(&connhdr, 0, sizeof connhdr);
        memset(&conniov, 0, sizeof conniov);
        memset(&connbuf, 0, sizeof connbuf);

        conniov.iov_base = conndata;    /* Data payload to send */
        conniov.iov_len  = 1;        /* We send just one (dummy) byte, */
        conndata[0] = 0;        /* a zero. */

        /* Construct the message (header) */
        connhdr.msg_name       = NULL;        /* No optional address */
        connhdr.msg_namelen    = 0;        /* No optional address */
        connhdr.msg_iov        = &conniov;    /* Normal payload - at least one byte */
        connhdr.msg_iovlen     = 1;        /* Only one vector in conniov */
        connhdr.msg_control    = connbuf;    /* Ancillary data */
        connhdr.msg_controllen = sizeof connbuf;

        /* Construct the ancillary data needed to pass one descriptor. */
        connmsg = CMSG_FIRSTHDR(&connhdr);
        connmsg->cmsg_level = SOL_SOCKET;
        connmsg->cmsg_type = SCM_RIGHTS;
        connmsg->cmsg_len = CMSG_LEN(sizeof (int));
        /* Copy the descriptor to the ancillary data. */
        memcpy(CMSG_DATA(connmsg), &connfd, sizeof (int));

        /* Update the message to reflect the ancillary data length */
        connhdr.msg_controllen = connmsg->cmsg_len;

        do {
            written = sendmsg(workerfd, &connhdr, MSG_NOSIGNAL);
        } while (written == (ssize_t)-1 && errno == EINTR);
        if (written == (ssize_t)-1) {
            const char *const errmsg = strerror(errno);

            /* Lost connection to the other end? */
            if (!done) {
                if (errno == EPIPE)
                    done = SIGPIPE;
                else
                    done = -1;
            }

            printf("Cannot pass connection to worker: %s.\n", errmsg);
            fflush(stdout);

            close(connfd);

            /* Break main loop. */
            break;
        }

        /* Since the descriptor has been transferred to the other process,
         * we can close our end. */
        do {
            result = close(connfd);
        } while (result == -1 && errno == EINTR);
        if (result == -1)
            printf("Error closing leftover connection descriptor: %s.\n", strerror(errno));

        printf("Connection transferred to the worker process.\n");
        fflush(stdout);
    }

    /* Shutdown. */

    close(listenfd);
    close(workerfd);

    switch (done) {
    case SIGTERM:
        printf("Terminated.\n");
        break;

    case SIGPIPE:
        printf("Lost connection.\n");
        break;

    case SIGHUP:
        printf("Hanging up.\n");
        break;

    case SIGINT:
        printf("Interrupted; exiting.\n");
        break;

    default:
        printf("Exiting.\n");
    }

    return 0;
}

worker.c

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netdb.h>
#include <signal.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>

/* How many concurrent pending connections are allowed */
#define  LISTEN_BACKLOG     32

/* Unix domain socket path length (including NUL byte) */
#ifndef  UNIX_PATH_LEN
#define  UNIX_PATH_LEN    108
#endif

/* Flag to indicate we have received a shutdown request. */
volatile sig_atomic_t     done = 0;

/* Shutdown request signal handler, of the basic type. */
void handle_done_signal(int signum)
{
    if (!done)
        done = signum;

    return;
}

/* Install shutdown request signal handler on signal signum. */
int set_done_signal(const int signum)
{
    struct sigaction act;

    sigemptyset(&act.sa_mask);
    act.sa_handler = handle_done_signal;
    act.sa_flags = 0;

    if (sigaction(signum, &act, NULL) == -1)
        return errno;
    else
        return 0;
}

/* Helper function to duplicate file descriptors.
 * Returns 0 if success, errno error code otherwise.
*/
static int copy_fd(const int fromfd, const int tofd)
{
    int result;

    if (fromfd == tofd)
        return 0;

    if (fromfd == -1 || tofd == -1)
        return errno = EINVAL;

    do {
        result = dup2(fromfd, tofd);
    } while (result == -1 && errno == EINTR);
    if (result == -1)
        return errno;

    return 0;
}

int main(int argc, char *argv[])
{
    struct sockaddr_un     worker;
    int             workerfd, workerpathlen;
    int             serverfd, clientfd;

    pid_t             child;

    struct msghdr         msghdr;
    struct iovec         msgiov;
    struct cmsghdr        *cmsg;
    char             data[1];
    char             ancillary[CMSG_SPACE(sizeof (int))];
    ssize_t             received;

    if (argc < 3) {
        fprintf(stderr, "\n");
        fprintf(stderr, "Usage: %s WORKER COMMAND [ ARGS .. ]\n", argv[0]);
        fprintf(stderr, "This creates a worker that receives connections\n");
        fprintf(stderr, "from Unix domain socket WORKER.\n");
        fprintf(stderr, "Each connection is served by COMMAND, with the\n");
        fprintf(stderr, "connection connected to its standard input and output.\n");
        fprintf(stderr, "\n");
        return (argc == 1) ? 0 : 1;
    }

    /* Handle HUP, INT, PIPE, and TERM signals,
     * so when the user presses Ctrl-C, the worker process cannot be contacted,
     * or the user sends a HUP or TERM signal, this server closes down cleanly. */
    if (set_done_signal(SIGINT) ||
        set_done_signal(SIGHUP) ||
        set_done_signal(SIGPIPE) ||
        set_done_signal(SIGTERM)) {
        fprintf(stderr, "Error: Cannot install signal handlers.\n");
        return 1;
    }

    /* Unix domain socket */
    memset(&worker, 0, sizeof worker);
    worker.sun_family = AF_UNIX;

    workerpathlen = strlen(argv[1]);
    if (workerpathlen < 1) {
        fprintf(stderr, "Worker Unix domain socket path cannot be empty.\n");
        return 1;
    } else
    if (workerpathlen >= UNIX_PATH_LEN) {
        fprintf(stderr, "%s: Worker Unix domain socket path is too long.\n", argv[1]);
        return 1;
    }

    memcpy(&worker.sun_path, argv[1], workerpathlen);
    /* Note: the terminating NUL byte was set by memset(&worker, 0, sizeof worker) above. */

    workerfd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (workerfd == -1) {
        fprintf(stderr, "Cannot create an Unix domain socket: %s.\n", strerror(errno));
        return 1;
    }
    if (bind(workerfd, (const struct sockaddr *)(&worker), (socklen_t)sizeof worker) == -1) {
        fprintf(stderr, "%s: %s.\n", argv[1], strerror(errno));
        close(workerfd);
        return 1;
    }
    if (listen(workerfd, LISTEN_BACKLOG) == -1) {
        fprintf(stderr, "%s: Cannot listen for messages: %s.\n", argv[1], strerror(errno));
        close(workerfd);
        return 1;
    }

    printf("Listening for descriptors on %s.\n", argv[1]);
    fflush(stdout);

    while (!done) {

        serverfd = accept(workerfd, NULL, NULL);
        if (serverfd == -1) {

            if (errno == EINTR)
                continue;

            printf("Failed to accept a connection from the server: %s.\n", strerror(errno));
            fflush(stdout);
            continue;
        }

        printf("Connection from the server.\n");
        fflush(stdout);

        while (!done && serverfd != -1) {

            memset(&msghdr, 0, sizeof msghdr);
            memset(&msgiov, 0, sizeof msgiov);

            msghdr.msg_name       = NULL;
            msghdr.msg_namelen    = 0;
            msghdr.msg_control    = &ancillary;
            msghdr.msg_controllen = sizeof ancillary;

            cmsg = CMSG_FIRSTHDR(&msghdr);
            cmsg->cmsg_level = SOL_SOCKET;
            cmsg->cmsg_type = SCM_RIGHTS;
            cmsg->cmsg_len = CMSG_LEN(sizeof (int));

            msghdr.msg_iov    = &msgiov;
            msghdr.msg_iovlen = 1;

            msgiov.iov_base    = &data;
            msgiov.iov_len = 1; /* Just one byte */

            received = recvmsg(serverfd, &msghdr, 0);

            if (received == (ssize_t)-1) {
                if (errno == EINTR)
                    continue;

                printf("Error receiving a message from server: %s.\n", strerror(errno));
                fflush(stdout);
                break;
            }

            cmsg = CMSG_FIRSTHDR(&msghdr);
            if (!cmsg || cmsg->cmsg_len != CMSG_LEN(sizeof (int))) {
                printf("Received a bad message from server.\n");
                fflush(stdout);
                break;
            }

            memcpy(&clientfd, CMSG_DATA(cmsg), sizeof (int));

            printf("Executing command with descriptor %d: ", clientfd);
            fflush(stdout);

            child = fork();
            if (child == (pid_t)-1) {
                printf("Fork failed: %s.\n", strerror(errno));
                fflush(stdout);
                close(clientfd);
                break;
            }

            if (!child) {
                /* This is the child process. */

                close(workerfd);
                close(serverfd);

                if (copy_fd(clientfd, STDIN_FILENO) ||
                    copy_fd(clientfd, STDOUT_FILENO) ||
                    copy_fd(clientfd, STDERR_FILENO))
                    return 126; /* Exits the client */

                if (clientfd != STDIN_FILENO &&
                    clientfd != STDOUT_FILENO &&
                    clientfd != STDERR_FILENO)
                    close(clientfd);

                execvp(argv[2], argv + 2);

                return 127; /* Exits the client */
            }

            printf("Done.\n");
            fflush(stdout);

            close(clientfd);
        }

        close(serverfd);

        printf("Closed connection to server.\n");
        fflush(stdout);        
    }

    /* Shutdown. */
    close(workerfd);

    switch (done) {
    case SIGTERM:
        printf("Terminated.\n");
        break;

    case SIGPIPE:
        printf("Lost connection.\n");
        break;

    case SIGHUP:
        printf("Hanging up.\n");
        break;

    case SIGINT:
        printf("Interrupted; exiting.\n");
        break;

    default:
        printf("Exiting.\n");
    }

    return 0;
}

您可以使用编译它们

gcc -W -Wall -O3 worker.c -o worker
gcc -W -Wall -O3 server.c -o server

并使用例如运行

rm -f connection
./worker connection  /bin/date &
./server 127.0.0.1 8000 connection &

如您所见,./worker./server进程是完全分开的。我建议从不同的窗口启动它们(省略&命令行末尾的 ,否则会在后台运行命令)。connection是用于传输网络连接文件描述符的 Unix 域套接字的路径或名称。这/bin/date是一个命令(不是 shell 命令,一个可执行文件),它将为每个连接执行,标准输入、输出和错误直接连接到网络客户端——非常像inetdxinetd确实如此,只是简单的框架。

您可以通过例如测试连接

nc 127.0.0.1 8000

或者

telnet 127.0.0.1 8000

上面的/bin/date命令只会将当前日期输出到标准输出,但如果你使用更聪明的工作命令,比如

rm -f connection
./worker connection printf 'HTTP/1.0 200 Ok\r\nConnection: close\r\nContent-Type: text/html; charset=UTF-8\r\nContent-Length: 67\r\n\r\n<html><head><title>Works</title></head><body>Works!</body></html>\r\n'

您可以使用浏览器 ( http://127.0.0.1:8000/) 进行测试。

该设计是这样的,它worker.c监听一个 Unix 域套接字(connection在所有上述示例命令的当前工作目录中)。它首先接受一个连接(来自单个服务器),然后期望每个传入字节与SCM_RIGHTS包含引用客户端连接的文件描述符的辅助数据相关联。如果出现问题,或者连接断开,它会返回等待来自服务器的新连接。如果它接收到客户端描述符,它会派生一个子进程,将其标准输入、输出和错误重定向到客户端描述符,并执行./worker命令行中指定的命令。父进程关闭它的客户端描述符副本,然后返回等待新的。

server.c侦听到在其命令行上指定的 IPv4 或 IPv6 地址和端口的传入连接。worker.c当它获得连接时,它通过命令行( )上指定的 Unix 域套接字将连接的文件描述符传输到上述进程connection,关闭自己的副本,然后返回等待新连接。请注意,如果服务器失去与工作人员的连接,它会中止;你会想./worker总是在./server.

两者都server.c安装worker.c简单的信号处理程序,以便您可以通过向它们发送 HUP 或 INT 信号来告诉它们退出(Ctrl-C,如果您在单独的终端或 shell 中在前台运行命令)。他们也有合理的错误检查,所以当他们退出时,他们会告诉你确切的原因。老实说,我这样做是因为这样你偶尔会收到 EINTR 错误,除非你正确对待它们(重试相关的系统调用,除非被要求退出),你的进程将是脆弱的,并且由于条件的最轻微变化而崩溃。健壮;这并不难,而且结果对用户/系统管理员更友好。

我希望你觉得代码很有趣。如果您对细节有任何疑问,我很乐意详细说明。请记住,我是在很短的时间内从头开始编写的,它只是一个简单的例子。有很大的改进空间。

于 2012-09-15T02:39:31.537 回答
2

UNIX 套接字用于在进程之间传递文件描述符。

于 2012-09-14T13:01:10.080 回答
1

根据这篇文章应该是可能的。您需要某种方式(想到管道或套接字)让您的工作进程知道套接字句柄。

不幸的是,我对 unix 编程没有经验,所以我不能给你更具体的信息。

于 2012-09-14T13:09:30.097 回答