实现一个共享计数器并非易事,但是一旦你完成它并将它放在某个库中的某个地方,你就可以用它做很多事情。
在使用 MPI-2书中,如果你要实现这些东西,你应该得到它,其中一个示例(代码可在线获得)是一个共享计数器。“不可扩展”的应该适用于几十个进程——计数器是一个 0..size-1 的整数数组,每个等级一个,然后“获取下一个工作项#”操作包括锁定窗口,读取其他人对计数器的贡献(在这种情况下,他们已经采取了多少项目),更新您自己的(++),关闭窗口并计算总数。这一切都是通过被动的片面操作来完成的。(更好的缩放只使用一棵树而不是一维数组)。
所以用途是你说 0 级托管计数器,每个人都继续做工作单元并更新计数器以获取下一个,直到没有更多工作为止;然后你在障碍物或其他地方等待并最终确定。
一旦你有这样的东西 - 使用共享值来获得下一个可用的工作单元 - 工作,那么你可以推广到更复杂的方法。因此,正如 suzterpatt 所建议的那样,每个人在开始时都接受“他们的份额”的工作单元效果很好,但是如果有些人完成得比其他人快怎么办?现在通常的答案是偷工减料。每个人都将他们的工作单元列表保存在一个出队中,然后当一个人用完工作时,它会从其他人的出队的另一端窃取工作单元,直到没有更多工作了。这实际上是完全分布式的 master-worker 版本,不再需要单一的 master 分区工作。一旦你有一个共享计数器工作,你就可以从中创建互斥体,然后你可以实现出队。
更新: 好的,所以这是一个 hacky-attempt 在做共享计数器 - 我在 MPI-2 书中的简单版本:似乎有效,但我不会说比这更强大的东西(没有玩过这个东西很久了)。有一个简单的计数器实现(对应于 MPI-2 书中的非缩放版本),带有两个简单的测试,一个大致对应于您的工作案例;每个项目都会更新计数器以获取工作项,然后执行“工作”(随机睡眠时间)。在每次测试结束时,计数器数据结构被打印出来,这是每个等级所做的增量#。
#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
struct mpi_counter_t {
MPI_Win win;
int hostrank ;
int myval;
int *data;
int rank, size;
};
struct mpi_counter_t *create_counter(int hostrank) {
struct mpi_counter_t *count;
count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
count->hostrank = hostrank;
MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
MPI_Comm_size(MPI_COMM_WORLD, &(count->size));
if (count->rank == hostrank) {
MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data));
for (int i=0; i<count->size; i++) count->data[i] = 0;
MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int),
MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
} else {
count->data = NULL;
MPI_Win_create(count->data, 0, 1,
MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
}
count -> myval = 0;
return count;
}
int increment_counter(struct mpi_counter_t *count, int increment) {
int *vals = (int *)malloc( count->size * sizeof(int) );
int val;
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);
for (int i=0; i<count->size; i++) {
if (i == count->rank) {
MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,
count->win);
} else {
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
}
}
MPI_Win_unlock(0, count->win);
count->myval += increment;
vals[count->rank] = count->myval;
val = 0;
for (int i=0; i<count->size; i++)
val += vals[i];
free(vals);
return val;
}
void delete_counter(struct mpi_counter_t **count) {
if ((*count)->rank == (*count)->hostrank) {
MPI_Free_mem((*count)->data);
}
MPI_Win_free(&((*count)->win));
free((*count));
*count = NULL;
return;
}
void print_counter(struct mpi_counter_t *count) {
if (count->rank == count->hostrank) {
for (int i=0; i<count->size; i++) {
printf("%2d ", count->data[i]);
}
puts("");
}
}
int test1() {
struct mpi_counter_t *c;
int rank;
int result;
c = create_counter(0);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
result = increment_counter(c, 1);
printf("%d got counter %d\n", rank, result);
MPI_Barrier(MPI_COMM_WORLD);
print_counter(c);
delete_counter(&c);
}
int test2() {
const int WORKITEMS=50;
struct mpi_counter_t *c;
int rank;
int result = 0;
c = create_counter(0);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
srandom(rank);
while (result < WORKITEMS) {
result = increment_counter(c, 1);
if (result <= WORKITEMS) {
printf("%d working on item %d...\n", rank, result);
sleep(random() % 10);
} else {
printf("%d done\n", rank);
}
}
MPI_Barrier(MPI_COMM_WORLD);
print_counter(c);
delete_counter(&c);
}
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
test1();
test2();
MPI_Finalize();
}