2

一段时间以来,我的代码中一直存在错误,但还不知道如何解决它。

我想要实现的目标很简单:每个工作节点(即等级为 0 的节点)在涉及一些计算的方形结构中都有一行(由一维数组表示)。计算完成后,该行将被发送回主服务器。

出于测试目的,不涉及计算。正在发生的一切是:

  • master 将行号发送给 worker,worker 使用行号来计算相应的值
  • worker 将带有结果值的数组发回

现在,我的问题是:

  • 对于一行中的元素数量(大小 = 1006)和工人数量 > 1,所有工作都按预期工作,直到达到一定大小
  • 如果一行中的元素超过 1006,worker 无法关闭并且程序不会终止
  • 仅当我尝试将数组发送回主机时才会发生这种情况。如果我只是简单地发回一个 INT,那么一切正常(参见 doMasterTasks() 和 doWorkerTasks() 中的注释行)

基于最后一个要点,我假设必须存在一些竞争条件,只有当要发送回主节点的数组达到一定大小时才会出现。

你知道问题可能是什么吗?

编译以下代码: mpicc -O2 -std=c99 -o simple

像这样运行可执行文件: mpirun -np 3 simple <size>(例如 1006 或 1007)

这是代码:

#include "mpi.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#define MASTER_RANK 0
#define TAG_RESULT 1
#define TAG_ROW 2
#define TAG_FINISHOFF 3

int mpi_call_result, my_rank, dimension, np;

// forward declarations
void doInitWork(int argc, char **argv);
void doMasterTasks(int argc, char **argv);
void doWorkerTasks(void);
void finalize();
void quit(const char *msg, int mpi_call_result);

void shutdownWorkers() {
    printf("All work has been done, shutting down clients now.\n");
    for (int i = 0; i < np; i++) {
        MPI_Send(0, 0, MPI_INT, i, TAG_FINISHOFF, MPI_COMM_WORLD);
    }
}

void doMasterTasks(int argc, char **argv) {
    printf("Starting to distribute work...\n");
    int size = dimension;
    int * dataBuffer = (int *) malloc(sizeof(int) * size);

    int currentRow = 0;
    int receivedRow = -1;
    int rowsLeft = dimension;
    MPI_Status status;

    for (int i = 1; i < np; i++) {
        MPI_Send(&currentRow, 1, MPI_INT, i, TAG_ROW, MPI_COMM_WORLD);
        rowsLeft--;
        currentRow++;

    }

    for (;;) {
//        MPI_Recv(dataBuffer, size, MPI_INT, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status);
        MPI_Recv(&receivedRow, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

        if (rowsLeft == 0)
            break;

        if (currentRow > 1004)
            printf("Sending row %d to worker %d\n", currentRow, status.MPI_SOURCE);
        MPI_Send(&currentRow, 1, MPI_INT, status.MPI_SOURCE, TAG_ROW, MPI_COMM_WORLD);
        rowsLeft--;
        currentRow++;
    }
    shutdownWorkers();
    free(dataBuffer);
}

void doWorkerTasks() {
    printf("Worker %d started\n", my_rank);

    // send the processed row back as the first element in the colours array.
    int size = dimension;
    int * data = (int *) malloc(sizeof(int) * size);
    memset(data, 0, sizeof(size));

    int processingRow = -1;
    MPI_Status status;

    for (;;) {

        MPI_Recv(&processingRow, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
        if (status.MPI_TAG == TAG_FINISHOFF) {
            printf("Finish-OFF tag received!\n");
            break;
        } else {
//            MPI_Send(data, size, MPI_INT, 0, TAG_RESULT, MPI_COMM_WORLD);
            MPI_Send(&processingRow, 1, MPI_INT, 0, TAG_RESULT, MPI_COMM_WORLD);
        }
    }

    printf("Slave %d finished work\n", my_rank);
    free(data);
}

int main(int argc, char **argv) {


    if (argc == 2) {
        sscanf(argv[1], "%d", &dimension);
    } else {
        dimension = 1000;
    }

    doInitWork(argc, argv);

    if (my_rank == MASTER_RANK) {
        doMasterTasks(argc, argv);
    } else {
        doWorkerTasks();
    }
    finalize();
}

void quit(const char *msg, int mpi_call_result) {
    printf("\n%s\n", msg);
    MPI_Abort(MPI_COMM_WORLD, mpi_call_result);
    exit(mpi_call_result);
}

void finalize() {
    mpi_call_result = MPI_Finalize();
    if (mpi_call_result != 0) {
        quit("Finalizing the MPI system failed, aborting now...", mpi_call_result);
    }
}

void doInitWork(int argc, char **argv) {
    mpi_call_result = MPI_Init(&argc, &argv);
    if (mpi_call_result != 0) {
        quit("Error while initializing the system. Aborting now...\n", mpi_call_result);
    }
    MPI_Comm_size(MPI_COMM_WORLD, &np);
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
}

任何帮助是极大的赞赏!

最好的,克里斯

4

1 回答 1

6

如果您查看您的 doWorkerTasks,您会发现它们发送的数据消息与接收的数据消息完全相同;(他们又收到一个来关闭它们)。

但是你的主代码:

for (int i = 1; i < np; i++) {
    MPI_Send(&currentRow, 1, MPI_INT, i, TAG_ROW, MPI_COMM_WORLD);
    rowsLeft--;
    currentRow++;

}

for (;;) {
    MPI_Recv(dataBuffer, size, MPI_INT, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status);

    if (rowsLeft == 0)
        break;

    MPI_Send(&currentRow, 1, MPI_INT, status.MPI_SOURCE, TAG_ROW, MPI_COMM_WORLD);
    rowsLeft--;
    currentRow++;
}

发送 np-2 的数据消息比它接收的多。特别是,它只会继续接收数据,直到它没有更多要发送的数据,即使应该有 np-2 更多的数据消息未完成。将代码更改为以下内容:

int rowsLeftToSend= dimension;
int rowsLeftToReceive = dimension;

for (int i = 1; i < np; i++) {
    MPI_Send(&currentRow, 1, MPI_INT, i, TAG_ROW, MPI_COMM_WORLD);
    rowsLeftToSend--;
    currentRow++;

}

while (rowsLeftToReceive > 0) {
    MPI_Recv(dataBuffer, size, MPI_INT, MPI_ANY_SOURCE, TAG_RESULT, MPI_COMM_WORLD, &status);
    rowsLeftToReceive--;

    if (rowsLeftToSend> 0) {
        if (currentRow > 1004)
            printf("Sending row %d to worker %d\n", currentRow, status.MPI_SOURCE);
        MPI_Send(&currentRow, 1, MPI_INT, status.MPI_SOURCE, TAG_ROW, MPI_COMM_WORLD);
        rowsLeftToSend--;
        currentRow++;
    }
}

现在工作。

为什么对于较小的消息大小,代码不会死锁(注意这是死锁,而不是竞争条件;这是分布式计算中更常见的并行错误),这是大多数 MPI 实现如何工作的一个微妙细节。通常,无论接收者是否准备好,MPI 实现只是将小消息“推”到管道中,但较大的消息(因为它们在接收端占用更多存储资源)需要在发送者和接收者之间进行一些握手。(如果您想了解更多信息,请搜索 Eager vs Rendezvous 协议)。

所以对于小消息情况(在这种情况下少于 1006 个整数,并且 1 个整数也肯定有效),无论主节点是否接收到它们,工作节点都会发送它们。如果 master调用MPI_Recv(),则消息已经存在并且它会立即返回。但它没有,所以主端有待处理的消息;但没关系。主人发出了杀戮信息,所有人都退出了。

但是对于较大的消息,剩余的 send()s 必须让接收者参与才能清除,并且由于接收者永远不会这样做,剩余的工作人员会挂起。

请注意,即使对于没有死锁的小消息情况,代码也无法正常工作 - 缺少计算数据。

更新:你的有一个类似的问题shutdownWorkers

void shutdownWorkers() {
    printf("All work has been done, shutting down clients now.\n");
    for (int i = 0; i < np; i++) {
        MPI_Send(0, 0, MPI_INT, i, TAG_FINISHOFF, MPI_COMM_WORLD);
    }
}

在这里,您将发送到所有进程,包括0 级,即执行发送的进程。原则上,MPI_Send 应该死锁,因为它是阻塞发送并且没有匹配的接收已经发布。您可以在之前发布非阻塞接收以避免这种情况,但这是不必要的——排名 0 不需要让自己知道结束。所以只需将循环更改为

    for (int i = 1; i < np; i++)

tl;dr - 你的代码死锁了,因为主人没有从工人那里收到足够的消息;由于大多数 MPI 库共有的实现细节,它碰巧适用于小消息大小。

于 2011-10-22T14:58:12.593 回答