1

我正在尝试进行无序的一对一通信。基本上我有多个相同大小的浮点数组,由整数 id 标识。

每条消息应如下所示:

<int id><float array data>

在接收方,它确切地知道有多少个数组,因此设置了准确的 recv 数量。收到消息后,它会解析 id 并将数据放入正确的位置。问题是消息可以从任何其他进程发送到接收进程。(例如,生产者有一个工作队列结构,并处理队列中可用的任何 id。)

由于 MPI 只保证 P2P 的订单交付,我不能轻易地将整数 id 和 FP 数据放在两条消息中,否则接收方可能无法将 id 与数据匹配。MPI 也不允许一次发送两种类型的数据。

我只能想到两种方法。

1) Receiver 有一个大小为 m (source[m]) 的数组,m 是发送节点的数量。发送者首先发送 id,然后是数据。接收者收到来自发送者 i 的整数消息后,将 id 保存到 source[i]。从发送者 i 接收到 FP 数组后,它会检查 source[i],获取 id,并将数据移动到正确的位置。它之所以有效,是因为 MPI 保证了有序的 P2P 通信。它要求接收者保存每个发送者的状态信息。更糟糕的是,如果单个发送进程可以在数据之前发送两个 id(例如多线程),则此机制将不起作用。

2)把id和FP当作字节,复制到发送缓冲区。将它们作为 MPI_CHAR 发送,接收器将它们转换回整数和 FP 数组。然后我需要支付将内容复制到发送方字节缓冲区的额外费用。随着 MPI 进程中线程数量的增加,总临时缓冲区也会增加。

它们都不是完美的解决方案。我不想在进程中锁定任何东西。我想知道你们中是否有人有更好的建议。

编辑:代码将在具有 infiniband 的共享集群上运行。机器将随机分配。所以我认为 TCP 套接字在这里不能帮助我。此外,IPoIB 看起来很昂贵。我确实需要完整的 40Gbps 通信速度,并让 CPU 进行计算。

4

2 回答 2

4

您可以MPI_ANY_SOURCE在接收函数中指定源排名,然后使用它们的标签对消息进行排序,这比创建自定义消息更容易。这是一个简化的示例:

#include <stdio.h>
#include "mpi.h"

int main() {
    MPI_Init(NULL,NULL);
    int rank=0;
    int size=1;
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);
    MPI_Comm_size(MPI_COMM_WORLD,&size);

    // Receiver is the last node for simplicity in the arrays
    if (rank == size-1) {
        // Receiver has size-1 slots
        float data[size-1];
        MPI_Request request[size-1];

        // Use tags to sort receives
        for (int tag=0;tag<size-1;++tag){
            printf("Receiver for id %d\n",tag);
            // Non-blocking receive
            MPI_Irecv(data+tag,1,MPI_FLOAT,
                      MPI_ANY_SOURCE,tag,MPI_COMM_WORLD,&request[tag]);
        }

        // Wait for all requests to complete
        printf("Waiting...\n");
        MPI_Waitall(size-1,request,MPI_STATUSES_IGNORE);
        for (size_t i=0;i<size-1;++i){
            printf("%f\n",data[i]);
        }
    } else {
        // Producer
        int id = rank;
        float data = rank;
        printf("Sending {%d}{%f}\n",id,data);
        MPI_Send(&data,1,MPI_FLOAT,size-1,id,MPI_COMM_WORLD);
    }

    return MPI_Finalize();
}
于 2012-10-03T06:51:56.363 回答
3

正如有人已经写过的那样,您可以使用MPI_ANY_SOURCE从任何来源接收。要在一次发送中发送两种不同类型的数据,您可以使用派生数据类型

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define asize 10

typedef struct data_ {
  int   id;
  float array[asize];
} data;

int main() {

  MPI_Init(NULL,NULL);

  int rank = -1;
  int size = -1;
  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
  MPI_Comm_size(MPI_COMM_WORLD,&size);

  data buffer;    
 // Define and commit a new datatype
  int          blocklength [2];
  MPI_Aint     displacement[2];
  MPI_Datatype datatypes   [2];
  MPI_Datatype mpi_tdata;

  MPI_Aint     startid,startarray;
  MPI_Get_address(&(buffer.id),&startid);
  MPI_Get_address(&(buffer.array[0]),&startarray);

  blocklength [0] = 1;
  blocklength [1] = asize;
  displacement[0] = 0;
  displacement[1] = startarray - startid;
  datatypes   [0] = MPI_INT;
  datatypes   [1] = MPI_FLOAT;

  MPI_Type_create_struct(2,blocklength,displacement,datatypes,&mpi_tdata);
  MPI_Type_commit(&mpi_tdata);

  if (rank == 0) {
    int        count = 0;
    MPI_Status status;

    while (count < size-1 ) {
      // Non-blocking receive
      printf("Receiving message %d\n",count);
      MPI_Recv(&buffer,1,mpi_tdata,MPI_ANY_SOURCE,0,MPI_COMM_WORLD,&status);
      printf("Message tag %d, first entry %g\n",buffer.id,buffer.array[0]);
      // Counting the received messages 
      count++;
    }

  } else {
    // Initialize buffer to be sent
    buffer.id = rank;
    for (int ii = 0; ii < size; ii++) {
      buffer.array[ii] = 10*rank + ii;
    }
    // Send buffer
    MPI_Send(&buffer,1,mpi_tdata,0,0,MPI_COMM_WORLD);
  }

  MPI_Type_free(&mpi_tdata);

  MPI_Finalize();
  return 0;
}
于 2012-10-03T08:49:49.857 回答