3

I'm trying to learn how to work with fork() to create new processes and pipes to communicate with each process. Let's say I have a list that contains 20 words, and I create 3 processes. Now, I need to distribute the words between the processes using pipes, and the each process will sort the list of words it receives. The way I want to achieve this is like this:

Word1 => Process1
Word2 => Process2
Word3 => Process3
Word4 => Process1
Word5 => Process2
Word6 => Process3
.
.
.

So each process will have a list of words to sort, and eventually I'll use MergeSort to merge all the sorted lists returned by each process. I'm not sure how to use pipes to communicate with each process (e.g. feed each process with a word). Any help that would put me on the right track would be appreciated.

4

3 回答 3

7

试试这个代码的大小。它使用固定数量的子进程,但您可以通过调整枚举来更改该数字MAX_KIDS(它主要是在设置为 3 的情况下进行测试;我后来将其更改为 5 只是为了确保)。

#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/wait.h>
#include <unistd.h>

typedef struct Child
{
    FILE *fp_to;
    FILE *fp_from;
    pid_t pid;
} Child;

enum { P_READ, P_WRITE };   /* Read, write descriptor of a pipe */
enum { MAX_LINE = 4096 };

static void be_childish(void);
static void distribute(size_t nkids, Child *kids);
static void err_exit(const char *fmt, ...);
static void merge(size_t nkids, Child *kids);
static void wait_for_kids(size_t nkids, Child *kids);

static int make_kid(Child *kid)
{
    int pipe1[2];   /* From parent to child */
    int pipe2[2];   /* From child to parent */
    if (pipe(pipe1) != 0)
        return -1;
    if (pipe(pipe2) != 0)
    {
        close(pipe1[P_READ]);
        close(pipe1[P_WRITE]);
        return -1;
    }
    if ((kid->pid = fork()) < 0)
    {
        close(pipe1[P_READ]);
        close(pipe1[P_WRITE]);
        close(pipe2[P_READ]);
        close(pipe2[P_WRITE]);
        return -1;
    }
    else if (kid->pid == 0)
    {
        dup2(pipe1[P_READ], STDIN_FILENO);
        dup2(pipe2[P_WRITE], STDOUT_FILENO);
        close(pipe1[P_READ]);
        close(pipe1[P_WRITE]);
        close(pipe2[P_READ]);
        close(pipe2[P_WRITE]);
        /* Reads standard input from parent; writes standard output to parent */
        be_childish();
        exit(0);
    }
    else
    {
        kid->fp_to   = fdopen(pipe1[P_WRITE], "w");
        kid->fp_from = fdopen(pipe2[P_READ], "r");
        close(pipe1[P_READ]);
        close(pipe2[P_WRITE]);
        return 0;
    }
}

int main(void)
{
    enum { NUM_KIDS = 5 };
    Child kids[NUM_KIDS];
    struct sigaction act;

    sigfillset(&act.sa_mask);
    act.sa_flags   = 0;
    act.sa_handler = SIG_DFL;
    sigaction(SIGCHLD, &act, 0);

    for (int i = 0; i < NUM_KIDS; i++)
    {
        if (make_kid(&kids[i]) != 0)
            err_exit("Fault starting child %d\n", i);
    }

    distribute(NUM_KIDS, kids);
    merge(NUM_KIDS, kids);

    wait_for_kids(NUM_KIDS, kids);
    return(0);
}

static void err_exit(const char *fmt, ...)
{
    va_list args;
    va_start(args, fmt);
    vfprintf(stderr, fmt, args);
    va_end(args);
    exit(1);
}

static int qs_compare(const void *v1, const void *v2)
{
    const char *s1 = *(char **)v1;
    const char *s2 = *(char **)v2;
    return(strcmp(s1, s2));
}

static char *estrdup(const char *str)
{
    size_t len = strlen(str) + 1;
    char *out = malloc(len);
    if (out == 0)
        err_exit("Out of memory!\n");
    memmove(out, str, len);
    return(out);
}

static void be_childish(void)
{
    char **lines = 0;
    size_t num_lines = 0;
    size_t max_lines = 0;
    char input[MAX_LINE];

    while (fgets(input, sizeof(input), stdin) != 0)
    {
        if (num_lines >= max_lines)
        {
            size_t n = (2 * max_lines + 2);
            void *space = realloc(lines, n * sizeof(char *));
            if (space == 0)
                err_exit("Out of memory!\n");
            lines = space;
            max_lines = n;
        }
        lines[num_lines++] = estrdup(input);
    }

    qsort(lines, num_lines, sizeof(char *), qs_compare);

    for (size_t i = 0; i < num_lines; i++)
    {
        if (fputs(lines[i], stdout) == EOF)
            err_exit("Short write to parent from %d\n", (int)getpid());
    }

    exit(0);
}

static void distribute(size_t nkids, Child *kids)
{
    char   input[MAX_LINE];
    size_t n = 0;

    while (fgets(input, sizeof(input), stdin) != 0)
    {
        if (fputs(input, kids[n].fp_to) == EOF)
            err_exit("Short write to child %d\n", (int)kids[n].pid);
        if (++n >= nkids)
            n = 0;
    }

    /* Close pipes to children - let's them get on with sorting */
    for (size_t i = 0; i < nkids; i++)
    {
        fclose(kids[i].fp_to);
        kids[i].fp_to = 0;
    }
}

static void read_line(Child *kid, char *buffer, size_t maxlen, int *length)
{
    if (fgets(buffer, maxlen, kid->fp_from) != 0)
    {
        *length = strlen(buffer);
        buffer[*length] = '\0';
    }
    else
    {
        buffer[0] = '\0';
        *length = -1;
        fclose(kid->fp_from);
        kid->fp_from = 0;
    }
}

static int not_all_done(size_t nkids, int *lengths)
{
    for (size_t i = 0; i < nkids; i++)
    {
        if (lengths[i] > 0)
            return 1;
    }
    return 0;
}

static void min_line(size_t nkids, int *len, char **lines, size_t maxlen,
                     Child *kids, char *output)
{
    size_t  min_kid = 0;
    char   *min_str = 0;
    for (size_t i = 0; i < nkids; i++)
    {
        if (len[i] <= 0)
            continue;
        if (min_str == 0 || strcmp(min_str, lines[i]) > 0)
        {
            min_str = lines[i];
            min_kid = i;
        }
    }
    strcpy(output, min_str);
    read_line(&kids[min_kid], lines[min_kid], maxlen, &len[min_kid]);
}

static void merge(size_t nkids, Child *kids)
{
    char line_data[nkids][MAX_LINE];
    char *lines[nkids];
    int  len[nkids];
    char output[MAX_LINE];

    for (size_t i = 0; i < nkids; i++)
        lines[i] = line_data[i];

    /* Preload first line from each kid */
    for (size_t i = 0; i < nkids; i++)
        read_line(&kids[i], lines[i], MAX_LINE, &len[i]);

    while (not_all_done(nkids, len))
    {
        min_line(nkids, len, lines, MAX_LINE, kids, output);
        fputs(output, stdout);
    }
}

static void wait_for_kids(size_t nkids, Child *kids)
{
    int pid;
    int status;

    while ((pid = waitpid(-1, &status, 0)) != -1)
    {
        for (size_t i = 0; i < nkids; i++)
        {
            if (pid == kids[i].pid)
                kids[i].pid = -1;
        }
    }

    /* This check loop is not really necessary */
    for (size_t i = 0; i < nkids; i++)
    {
        if (kids[i].pid != -1)
            err_exit("Child %d died without being tracked\n", (int)kids[i].pid);
    }
}
于 2012-11-04T03:02:59.427 回答
4

整体画面通常是:

pid_t pids[3];
int fd[3][2];

int i;
for (i = 0; i < 3; ++i) {
    /* create the pipe */
    if (pipe(fd[i]) < 0) {
            perror("pipe error");
            exit(1);
    }

   /* fork the child */
   pid[i] = fork();
   if (pid[i] < 0) {
       perror("fork error");
   } else if (pid[i] > 0) {
       /* in parent process */
       /* close reading end */
       close(fd[i][0]);
   } else {
       /* in child process */
       /* close writing end */
       close(fd[i][1]);
       /* read from parent */
       read(fd[i][0], line, max);
       ...
    }
}

/* in parent process */
char words[100][10] = {...};
int j, child = 0;
/* for all words */
for (j = 0; j < 100; ++j) {
    /* write to child */
    write(fd[child][1], words[j], strlen(words[j]));
    ...
    ++child;
    if (child >= 3)
        child = 0;
}

复制管道部件,以便从子级与父级进行通信。当父母和孩子试图同时在两个方向进行交流时,请注意不要陷入僵局。

于 2012-11-03T22:15:55.867 回答
2

管道没有什么神奇之处——它们只是一个有两个端点的通信媒介。逻辑大致如下:

创建 3 个管道并保持一个端点。叉三下,让每个叉子抓住管子的另一端。然后孩子进入一个读取循环,等待输入并写回输出。父级可以循环所有输出,然后循环读取输入。这不是最好的策略,但它是迄今为止最简单的。IE,

while there is work left to do:
   for i in 1..3
       write current work unit to pipe[i]

   for i in 1..3
       read back response from pipe[i]

给定的孩子看起来像这样:

while(input = read from pipe)
    result = do work on input
    write result to pipe

下一步将以异步、非阻塞方式(可能使用select,或只是忙等待轮询循环)在父进程中进行回读。这要求孩子们报告他们正在为哪个任务返回结果,因为排序可能会变得混乱(即,您不能再依赖您发送的第一个工作单元作为您得到的第一个响应)。欢迎来到并发错误的有趣世界。

鉴于您的问题的性质有些不明确,我希望这在某种程度上有用。

于 2012-11-03T22:12:26.220 回答