4

如何修改以下代码(改编自http://materials.jeremybejarano.com/MPIwithPython/pointToPoint.html),以便comm.Send接收每个实例root = 0并打印输出。目前,只接收到第一个发送命令。

#passRandomDraw.py
import numpy
from mpi4py import MPI
from mpi4py.MPI import ANY_SOURCE
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    randNum = numpy.zeros(1)
    print "Process before receiving random numbers"


else:
    for i in range(0,np.random.randint(1,10),1):
        randNum = numpy.zeros(1)
        randNum = numpy.random.random_sample(1)
        print "Process", rank, "iteration", i, "drew the number", randNum[0]
        comm.Send(randNum, dest=0)


if rank == 0:
   comm.Recv(randNum, ANY_SOURCE)
   print "Process", rank, "received the number", randNum[0]
4

2 回答 2

3

如果您不知道要发送多少条消息,那么您必须引入一条消息来标记消息的结束。您可以通过使用特殊标签来通用地使用它。为避免为终止消息提供不匹配的缓冲区,您可以使用probe检查传入的消息类型

tag_data = 42
tag_end = 23

if rank == 0:
    randNum = numpy.zeros(1)
    print "Process before receiving random numbers"
else:
    for i in range(0,np.random.randint(1,10),1):
        randNum = numpy.zeros(1)
        randNum = numpy.random.random_sample(1)
        print "Process", rank, "iteration", i, "drew the number", randNum[0]
        comm.Send(randNum, dest=0, tag=tag_data)
    # send the termination message. Using the lower-case interface is simpler
    comm.send(None, dest=0, tag=tag_end)

if rank == 0:
    # For debugging it might be better to use a list of still active procsses
    remaining = comm.Get_size() - 1
    while remaining > 0:
        s = MPI.Status()
        comm.Probe(status=s)
        # make sure we post the right kind of message
        if s.tag == tag_data:
            comm.Recv(randNum, s.source, tag=tag_data)
            print "Process ", s.source, " received the number", randNum[0]
        elif s.tag == tag_end:
            # don't need the result here
            print "Process ", rank, " is done"
            comm.recv(source=s.source, tag=tag_end)
            remaining -= 1

这有很多变化。例如,如果您知道一条消息是最后一条消息,您可以合并终止消息。

于 2016-03-18T21:28:02.217 回答
1

如果每个进程都知道要发送的消息数量,则可以设计以下步骤来解决问题:

1)减少发送到根进程的消息数量。每个进程向根发送它稍后将发送的消息数量。此操作称为归约,可以由函数执行comm.reduce(...)

2)接收进程0上的所有消息。

这是基于您的代码,应该可以解决问题。它可以运行mpirun -np 4 python main.py

#passRandomDraw.py
import numpy
from mpi4py import MPI
from mpi4py.MPI import ANY_SOURCE
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

#just in case, if numpy.random is seed with 
np.random.seed(np.random.randint(np.iinfo(np.uint32).min,np.iinfo(np.uint32).max)+rank)

if rank == 0:
    randNum = numpy.zeros(1)
    print "Process before receiving random numbers"
    nb=np.empty((1,),dtype=int)
    nb0=np.zeros((1,),dtype=int)
    comm.Reduce([nb0, MPI.INT],[nb, MPI.INT],op=MPI.SUM, root=0)  #sums the total number of random number from every process on rank 0, in nb.
    #print "rank"+str(rank)+" nb "+str(nb)
else:
    nb=np.empty((1,),dtype=int)
    nb[0]=np.random.randint(1,10)
    #print "rank"+str(rank)+" nb "+str(nb)
    comm.Reduce([nb, MPI.INT],None,op=MPI.SUM, root=0)
    for i in range(0,nb[0],1):
        randNum = numpy.zeros(1)
        randNum = numpy.random.random_sample(1)
        print "Process", rank, "iteration", i, "drew the number", randNum[0]
        comm.Send(randNum, dest=0)



if rank == 0:
   for i in range(nb[0]): #receives nb message, each one with its int.
       comm.Recv(randNum, ANY_SOURCE)
       print "Process", rank, "received the number", randNum[0]

根据Mersenne Twister 伪随机数生成器的文档,numpy.random()/dev/urandom最初由从(或 Windows 模拟)提取的数字(如果可用)或从时钟中提取的数字作为种子。因此,在最后一种情况下,所有进程都可以接收相同的种子并生成相同的随机数。为了防止这种情况发生,我添加了以下行:

np.random.seed(np.random.randint(np.iinfo(np.uint32).min,np.iinfo(np.uint32).max)+rank)
于 2016-03-18T21:41:03.397 回答