3

我正在尝试并行化我编写的脚本。每个进程都需要进行计算并将数据存储到数组的特定部分(列表列表)。每个进程都在计算和存储它的数据,但我不知道如何将数据从非根进程获取到根进程,以便它可以将数据打印到文件中。我为我的脚本创建了一个最小的工作示例——这个示例仅为简单起见而设计为在 2 个内核上运行:

from mpi4py import MPI 
import pdb 
import os

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

# Declare the array that will store all the temp results
temps = [[0 for x in xrange(5)] for x in xrange(4)]

# Loop over all directories
if rank==0:
   counter = 0 
   for i in range(2):
      for j in range(5):
         temps[i][j] = counter
     counter = counter + 1 

else:
   counter = 20
   for i in range(2,4):
      for j in range(5):
         temps[i][j] = counter
         counter = counter + 1 

temps = comm.bcast(temps,root=0)

if rank==0:

   print temps

我使用以下命令执行脚本:

mpiexec -n 2 python mne.py

案例结束后,输出为:

[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [0, 0, 0, 0, 0], [0, 0, 0, 0, 0]]

所以你可以看到数据共享没有按我的意愿工作。有人可以告诉我将数据返回根进程的正确方法吗?

4

1 回答 1

5

代码工作正常,只是没有做你想做的事。

这条线

temps = comm.bcast(temps,root=0)

将处理器 0 的变量广播temps到所有处理器(包括等级 0),这当然给出了上面的结果。您想使用gather(或者allgather,如果您希望所有处理器都有答案)。那看起来更像这样:

from mpi4py import MPI
import pdb
import os

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

assert size == 2

# Declare the array that will store all the temp results
temps = [[0 for x in xrange(5)] for x in xrange(4)]

# declare the array that holds the local results
locals =[[0 for x in xrange(5)] for x in xrange(2)]

# Loop over all directories
if rank==0:
   counter = 0
   for i in range(2):
      for j in range(5):
         locals[i][j] = counter
         counter = counter + 1

else:
   counter = 20
   for i in range(2):
      for j in range(5):
         locals[i][j] = counter
         counter = counter + 1

temps = comm.gather(locals,temps,root=0)

if rank==0:
   print temps

如果您真的想就地进行收集,并且您知道(比如说)所有真实数据都将大于您初始化数据所用的零,您可以使用归约操作,但这更容易使用 numpy 数组:

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

assert size == 2

# Declare the array that will store all the temp results
temps = numpy.zeros((4,5))

# Loop over all directories
if rank==0:
   counter = 0
   for i in range(2):
      for j in range(5):
         temps[i,j] = counter
         counter = counter + 1

else:
   counter = 20
   for i in range(2,4):
      for j in range(5):
         temps[i,j] = counter
         counter = counter + 1

comm.Allreduce(MPI.IN_PLACE,temps,op=MPI.MAX)

if rank==0:
   print temps
于 2012-12-14T15:48:28.093 回答