1

编辑:对不起,我忘了提到我将两个 5000x5000 矩阵相乘。

这是表明当我增加进程数量时,时间也在增加的输出。那么这段代码的逻辑是否有问题。我从网上找到它,只将名称更改为 matrixMulti 及其 printf。当我连接到 Grid 实验室并增加进程数量时,这似乎合乎逻辑但无法正常工作。所以你怎么看?

终端截图

/**********************************************************************************************
 * Matrix Multiplication Program using MPI.
 *
 * Viraj Brian Wijesuriya - University of Colombo School of Computing, Sri Lanka.
 *
 * Works with any type of two matrixes [A], [B] which could be multiplied to produce a matrix [c].
 *
 * Master process initializes the multiplication operands, distributes the muliplication
 * operation to worker processes and reduces the worker results to construct the final output.
 *
 ************************************************************************************************/

#include<stdio.h>
#include<mpi.h>
#define NUM_ROWS_A 5000 //rows of input [A]
#define NUM_COLUMNS_A 5000 //columns of input [A]
#define NUM_ROWS_B 5000 //rows of input [B]
#define NUM_COLUMNS_B 5000 //columns of input [B]
#define MASTER_TO_SLAVE_TAG 1 //tag for messages sent from master to slaves
#define SLAVE_TO_MASTER_TAG 4 //tag for messages sent from slaves to master
void makeAB(); //makes the [A] and [B] matrixes
void printArray(); //print the content of output matrix [C];

int rank; //process rank
int size; //number of processes
int i, j, k; //helper variables
double mat_a[NUM_ROWS_A][NUM_COLUMNS_A]; //declare input [A]
double mat_b[NUM_ROWS_B][NUM_COLUMNS_B]; //declare input [B]
double mat_result[NUM_ROWS_A][NUM_COLUMNS_B]; //declare output [C]
double start_time; //hold start time
double end_time; // hold end time
int low_bound; //low bound of the number of rows of [A] allocated to a slave
int upper_bound; //upper bound of the number of rows of [A] allocated to a slave
int portion; //portion of the number of rows of [A] allocated to a slave
MPI_Status status; // store status of a MPI_Recv
MPI_Request request; //capture request of a MPI_Isend

int main(int argc, char *argv[])
{

    MPI_Init(&argc, &argv); //initialize MPI operations
    MPI_Comm_rank(MPI_COMM_WORLD, &rank); //get the rank
    MPI_Comm_size(MPI_COMM_WORLD, &size); //get number of processes

    /* master initializes work*/
    if (rank == 0) {
        makeAB();
        start_time = MPI_Wtime();
        for (i = 1; i < size; i++) {//for each slave other than the master
            portion = (NUM_ROWS_A / (size - 1)); // calculate portion without master
            low_bound = (i - 1) * portion;
            if (((i + 1) == size) && ((NUM_ROWS_A % (size - 1)) != 0)) {//if rows of [A] cannot be equally divided among slaves
                upper_bound = NUM_ROWS_A; //last slave gets all the remaining rows
            } else {
                upper_bound = low_bound + portion; //rows of [A] are equally divisable among slaves
            }
            //send the low bound first without blocking, to the intended slave
            MPI_Isend(&low_bound, 1, MPI_INT, i, MASTER_TO_SLAVE_TAG, MPI_COMM_WORLD, &request);
            //next send the upper bound without blocking, to the intended slave
            MPI_Isend(&upper_bound, 1, MPI_INT, i, MASTER_TO_SLAVE_TAG + 1, MPI_COMM_WORLD, &request);
            //finally send the allocated row portion of [A] without blocking, to the intended slave
            MPI_Isend(&mat_a[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_A, MPI_DOUBLE, i, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &request);
        }
    }
    //broadcast [B] to all the slaves
    MPI_Bcast(&mat_b, NUM_ROWS_B*NUM_COLUMNS_B, MPI_DOUBLE, 0, MPI_COMM_WORLD);

    /* work done by slaves*/
    if (rank > 0) {
        //receive low bound from the master
        MPI_Recv(&low_bound, 1, MPI_INT, 0, MASTER_TO_SLAVE_TAG, MPI_COMM_WORLD, &status);
        //next receive upper bound from the master
        MPI_Recv(&upper_bound, 1, MPI_INT, 0, MASTER_TO_SLAVE_TAG + 1, MPI_COMM_WORLD, &status);
        //finally receive row portion of [A] to be processed from the master
        MPI_Recv(&mat_a[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_A, MPI_DOUBLE, 0, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &status);
        for (i = low_bound; i < upper_bound; i++) {//iterate through a given set of rows of [A]
            for (j = 0; j < NUM_COLUMNS_B; j++) {//iterate through columns of [B]
                for (k = 0; k < NUM_ROWS_B; k++) {//iterate through rows of [B]
                    mat_result[i][j] += (mat_a[i][k] * mat_b[k][j]);
                }
            }
        }
        //send back the low bound first without blocking, to the master
        MPI_Isend(&low_bound, 1, MPI_INT, 0, SLAVE_TO_MASTER_TAG, MPI_COMM_WORLD, &request);
        //send the upper bound next without blocking, to the master
        MPI_Isend(&upper_bound, 1, MPI_INT, 0, SLAVE_TO_MASTER_TAG + 1, MPI_COMM_WORLD, &request);
        //finally send the processed portion of data without blocking, to the master
        MPI_Isend(&mat_result[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_B, MPI_DOUBLE, 0, SLAVE_TO_MASTER_TAG + 2, MPI_COMM_WORLD, &request);
    }

    /* master gathers processed work*/
    if (rank == 0) {
        for (i = 1; i < size; i++) {// untill all slaves have handed back the processed data
            //receive low bound from a slave
            MPI_Recv(&low_bound, 1, MPI_INT, i, SLAVE_TO_MASTER_TAG, MPI_COMM_WORLD, &status);
            //receive upper bound from a slave
            MPI_Recv(&upper_bound, 1, MPI_INT, i, SLAVE_TO_MASTER_TAG + 1, MPI_COMM_WORLD, &status);
            //receive processed data from a slave
            MPI_Recv(&mat_result[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_B, MPI_DOUBLE, i, SLAVE_TO_MASTER_TAG + 2, MPI_COMM_WORLD, &status);
        }
        end_time = MPI_Wtime();
        printf("\nRunning Time = %f\n\n", end_time - start_time);
        printArray();
    }
    MPI_Finalize(); //finalize MPI operations
    return 0;
}

void makeAB()
{
    for (i = 0; i < NUM_ROWS_A; i++) {
        for (j = 0; j < NUM_COLUMNS_A; j++) {
            mat_a[i][j] = i + j;
        }
    }
    for (i = 0; i < NUM_ROWS_B; i++) {
        for (j = 0; j < NUM_COLUMNS_B; j++) {
            mat_b[i][j] = i*j;
        }
    }
}

void printArray()
{
    for (i = 0; i < NUM_ROWS_A; i++) {
        printf("\n");
        for (j = 0; j < NUM_COLUMNS_A; j++)
            printf("%8.2f ", mat_a[i][j]);
    }
    printf("\n\n\n");
    for (i = 0; i < NUM_ROWS_B; i++) {
        printf("\n");
        for (j = 0; j < NUM_COLUMNS_B; j++)
            printf("%8.2f ", mat_b[i][j]);
    }
    printf("\n\n\n");
    for (i = 0; i < NUM_ROWS_A; i++) {
        printf("\n");
        for (j = 0; j < NUM_COLUMNS_B; j++)
            printf("%8.2f ", mat_result[i][j]);
    }
    printf("\n\n");
}
4

3 回答 3

5

这实际上并不令人惊讶。您拥有的工人越多,您的通信开销就越多(划分工作,汇总结果),因此通常有一个最佳点,您有足够的工人可以利用并行化,但没有那么多工人通信开销开始成为一个问题。随着内核数量的增加,您从缩小工作量和增加通信开销中获得的收益递减。这就是为什么在编写并行应用程序时,需要进行大量工作来衡量多少工作人员会产生最佳性能,以及设计网络结构以最大限度地减少开销。

于 2012-12-24T10:41:32.767 回答
4

发布的代码存在一些真正的正确性问题。让我们看看从 rank 0 开始的发送循环:

    for (i = 1; i < size; i++) {
        //...
        low_bound = (i - 1) * portion;
        upper_bound = low_bound + portion; 

        MPI_Isend(&low_bound, 1, MPI_INT, i, MASTER_TO_SLAVE_TAG, MPI_COMM_WORLD, &request);
        MPI_Isend(&upper_bound, 1, MPI_INT, i, MASTER_TO_SLAVE_TAG + 1, MPI_COMM_WORLD, &request);
        MPI_Isend(&mat_a[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_A, MPI_DOUBLE, i, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &request);
    }

你不能那样做。如果您要使用非阻塞请求,您最终必须使用MPI_Wait()or MPI_Test()for 请求,以便您(和 MPI 库)可以知道它们是完整的。您需要这样做以避免泄漏资源,但更重要的是,在这种情况下,您会反复覆盖low_boundupper_bound甚至在您知道发送已经发生之前。谁知道您的工作人员任务正在获取什么数据。此外,通过每次覆盖请求绝对保证资源泄漏。

有几种方法可以解决这个问题;最简单的方法是创建一个简单的上限和下限数组,以及一个请求数组:

if (rank == 0) {
    makeAB();
    requests     = malloc(size*3*sizeof(MPI_Request));
    low_bounds   = malloc(size*sizeof(int));
    upper_bounds = malloc(size*sizeof(int));

    start_time = MPI_Wtime();
    for (i = 1; i < size; i++) {
        portion = (NUM_ROWS_A / (size - 1));
        low_bounds[i] = (i - 1) * portion;
        if (((i + 1) == size) && ((NUM_ROWS_A % (size - 1)) != 0)) {
            upper_bounds[i] = NUM_ROWS_A; 
        } else {
            upper_bounds[i] = low_bounds[i] + portion; 
        }

        MPI_Isend(&(low_bounds[i]), 1, MPI_INT, i, MASTER_TO_SLAVE_TAG, MPI_COMM_WORLD, &(requests[3*i]));
        MPI_Isend(&(upper_bounds[i]), 1, MPI_INT, i, MASTER_TO_SLAVE_TAG + 1, MPI_COMM_WORLD, &(requests[3*i+1]));
        MPI_Isend(&mat_a[low_bounds[i]][0], (upper_bounds[i] - low_bounds[i]) * NUM_COLUMNS_A, MPI_DOUBLE, i, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &(requests[3*i+2]));
    }
    MPI_Waitall(3*(size-1), &(requests[3]), MPI_STATUS_IGNORE);
    free(requests);

这样做的好处是,由于 0 级保存了这些信息,工作人员完成后不需要将其发回,0 级可以直接接收到正确的位置:

    //...
    for (i = low_bound; i < upper_bound; i++) {
        for (j = 0; j < NUM_COLUMNS_B; j++) {
            for (k = 0; k < NUM_ROWS_B; k++) {
                mat_result[i][j] += (mat_a[i][k] * mat_b[k][j]);
            }
        }
    }
    MPI_Send(&mat_result[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_B, MPI_DOUBLE, 0, SLAVE_TO_MASTER_TAG + 2, MPI_COMM_WORLD);

//...
if (rank == 0) {
    for (i = 1; i < size; i++) {
        MPI_Recv(&mat_result[low_bounds[i]][0], (upper_bounds[i] - low_bounds[i]) * NUM_COLUMNS_B, MPI_DOUBLE, i, SLAVE_TO_MASTER_TAG + 2, MPI_COMM_WORLD, &status);
    }

但是只要您有一个必须分发给所有处理器的这些值的数组,您就可以使用 MPI_Scatter 操作,这通常比您的循环发送更有效:

    for (i = 1; i < size; i++) {
        low_bounds[i] = ...
        upper_bounds[i] = ...
    }

    MPI_Scatter(low_bounds,   1, MPI_INT, &low_bound,   1, MPI_INT, 0, MPI_COMM_WORLD);
    MPI_Scatter(upper_bounds, 1, MPI_INT, &upper_bound, 1, MPI_INT, 0, MPI_COMM_WORLD);

理想情况下,您也可以使用 scatter 或其变体来分发 A 数组。

MPI_Scatter()是一个集体操作,就像MPI_Bcast(),它把我们带到你的下一个问题。在您的原始代码中,您有以下内容:

    //rank 0:
    for (i = 1; i < size; i++ ) {
        //...
        MPI_Isend();
        MPI_Isend();
        MPI_Isend();
    }

    MPI_Bcast();

    // other ranks:
    MPI_Bcast();

    MPI_Recv();
    MPI_Recv();
    MPI_Recv();

集体和点对点通信的交错可能非常危险,并可能导致死锁。这里没有必要;您应该将 Bcast 移到 Scatter 和 Recv()s 之后(现在只有 1 个 recv)。这使您的工作任务代码看起来像:

    MPI_Scatter(NULL, 1, MPI_INT, &low_bound,   1, MPI_INT, 0, MPI_COMM_WORLD);
    MPI_Scatter(NULL, 1, MPI_INT, &upper_bound, 1, MPI_INT, 0, MPI_COMM_WORLD);

    MPI_Recv(&mat_a[low_bound][0], (upper_bound - low_bound) * NUM_COLUMNS_A, MPI_DOUBLE, 0, MASTER_TO_SLAVE_TAG + 2, MPI_COMM_WORLD, &status);
    MPI_Bcast(&mat_b, NUM_ROWS_B*NUM_COLUMNS_B, MPI_DOUBLE, 0, MPI_COMM_WORLD);

这样就消除了大多数正确性问题,尽管我仍然建议使用 scatter 来分配 A 数组,然后在等待工作任务执行某些操作时使用 rank 0 来完成其“公平份额”的计算。(这意味着您的程序将适用于 size=1)。所以现在让我们看看性能问题。

对于固定的问题大小,您的程序必须:

  • 分配矩阵的上下界(2条短消息,或2个集合)
  • 分发 A 矩阵((size-1) 长消息,大小为 N^2/(size-1) 双倍)
  • 广播 B 矩阵(使用集体向所有任务发送 N^2 双打)
  • 检索 A 矩阵(与发送 A 矩阵相同)

每个任务都必须

  • 计算矩阵乘积(N^3/(size-1) 次操作)。

很容易看出,每个等级必须完成的实际计算工作量实际上随着运行的处理器数量下降为 1/(P-1),但通信工作量上升(如 P 或lg P,取决于)。在某些时候,那些交叉并在更多处理器上运行只会减慢速度。那么那个点在哪里呢?

在单个 8 核 nehalem 节点上进行快速扩展测试并使用IPM来简单计算花费时间的地方,我们有:

worker  |  running  |          |  MPI
tasks   |    time   |  Speedup |  time
--------+-----------+----------+--------
   1    |  90.85s   |     -    |  45.5s   
   2    |  45.75s   |   1.99x  |  15.4s
   4    |  23.42s   |   3.88x  |  4.93s
   6    |  15.75s   |   5.76x  |  2.51s

这实际上还不错。MPI 时间实际上几乎全部花在MPI_Recv()上,节点上表示复制矩阵片段的成本,对于 rank 0 进程,等待结果开始从工作任务返回。这表明让等级 0 做一些工作,并用收集操作替换接收的线性循环,将是有用的优化。

自然地,当您离开节点或使用更多数量的处理器时,通信成本将继续上升,并且扩展性将恶化。

更多小点:

首先,通过简单的负载平衡(如矩阵乘法)来解决紧密耦合的数值问题,主从通常是一种非常糟糕的方法。但我会假设这只是一个学习 MPI 练习,并留在那里。请注意,当然,进行基于 MPI 的矩阵乘法的正确方法是使用现有的矩阵乘法库,如SCALAPACKEigen等。

其次,全局变量的大量使用通常是没有帮助的,但这超出了这个问题的范围。我还要注意,这NUM_COLUMNS_A是必然的NUM_ROWS_B,你不需要两者。

于 2012-12-24T17:10:50.903 回答
1

分离流程时,您需要平衡发送结果所花费的时间与节省的成本。在您的情况下,我猜您发送的计算需要比本地计算更长的时间。

尝试与其他进程共享更大的工作块。

于 2012-12-24T10:40:10.497 回答