0

我正在实现一个经典的 map-reduce 程序,其中我有一个父级,它生成 N 个子级(maps)+ 1 个(reduce)。父级通过未命名的管道向 N 个子级中的每一个发送信息。映射处理请求并发送结果,一个 int,以减少。reduce 进行选择并总结从 map 到 reduce 的管道上写入的每个计数器。

最后,reduce 必须发送带有结果的信号 SIGUSR1,但我的代码多次执行此操作并且错误,因为它始终在信号处理程序中打印 o。是代码的一部分:

void reduce() {

    int answer;
    int i = 0;
    fd_set set;
    FD_ZERO(&set); //clean set of pipes

    while (1) {
        for (i = 0; i < maps_nr; i++) {
            FD_SET(fd_maps_to_reduce[i][READ], &set); 
        }
        if (select(FD_SETSIZE, &set, NULL, NULL, NULL) > 0) {
            printf("Entrou no select\n");
            for (i = 0; i < maps_nr; i++) { 
                if (FD_ISSET(fd_maps_to_reduce[i][READ], &set)) {
                    close(fd_maps_to_reduce[i][WRITE]);
                    if (read(fd_maps_to_reduce[i][READ], &answer, sizeof (int))) {
                        result += answer;
                        printf("Result in reduce =%d\n", result);
                    } else {
                        printf("Reduce failed to read from pipe from son :%d!\n", i);
                    }
                }
            }
        }//end of select
        printf("Reduce is going to send a signal with result= %d!\n", result);
        kill(getppid(), SIGUSR1);
        printf("Already send!\n");
    }
}

在父级中,在创建管道和子级之后,我有这样的东西:

(...)
signal(SIGUSR1, handle_signal);
while(exit) {
    (...)//this is a menu
    for i->N 
        send a struct to each child (through write in respective pipe)
    after the for do:
    pause();//waiting for a signal to be caught
    if (errno==EINTR)
       printf("caught sigusr1");
}

void handle_signal(int signum) {
    signal(SIGUSR1, handle_signal);
    //print results
    printf("Result: %d\n",result);
}

问题是reduce进程正确求和并正确打印,但是信号被发送了很多次,我只想要一个,即在wend向父级发送信号sigusr1,在pause()中被阻塞,然后打印全局 var 结果。

我怎样才能做到这一点?reduce有什么问题不是吗?

4

1 回答 1

1

首先,您可以像这样创建一个更好看的 select() 循环:

while (newfds = readfds, select(n, &newfds, NULL, NULL, NULL))

现在,谈谈你的问题。正如我从上面的代码中看到的那样,每次 select() unblocks 时,您都会向父级发送信号,这可能在每个映射进程中发生不止一次。select()每次您的任何map 进程向 reduce 进程发送数据时,都可以解除阻塞并运行循环中的所有其余代码。哪怕是半个答案。

如果要在减少所有内容后发送信号,则必须实现一些逻辑来检测所有进程都已完成,结束循环,然后(在循环外)向父级发送信号。

编辑:尝试这样的事情(我删除了您的代码的一些细节,以使示例更清晰)。

void reduce() {

    int i, answer, waiting, ret;
    fd_set read_set, selected_set;

    FD_ZERO(&read_set);

    for (i = 0; i < maps_nr; i++)
        FD_SET(fd_maps_to_reduce[i][READ], &read_set); 

    waiting = maps_nr; /* how many answers are we expecting? */

    while(waiting > 0 &&
          selected_set = read_set,
          select(FD_SETSIZE, &selected_set, NULL, NULL, NULL)) {

        for (i = 0; i < maps_nr; i++) {

            if (FD_ISSET(fd_maps_to_reduce[i][READ], &set)) {
                close(fd_maps_to_reduce[i][WRITE]);

                /* read your result. Once you have it: */
                FD_CLR(fd_maps_to_reduce[i][READ], &read_set);
                /* Now you won't wait for that pipe to produce data. */
                waiting--;
            }

        }
    }

    /* Now you are out of the select loop. Signal, or whatever. */

}

编辑 2:顺便说一下,您的结果可能是打印 0 因为您在这里处理不同的过程。reduce 进程有自己的结果变量副本,它不会更改主进程上的那个。您必须对其进行 IPC,如果已经为此编写了代码,则可能是另一个管道。

于 2010-10-16T04:10:23.007 回答