虽然 MPI 有一个有序的保证,即从 rank 1 到 rank 0 的两条消息将按照它们发送的顺序被 rank 0 接收——一条消息不能超过另一条消息——MPI 什么也没说,也不能说什么,关于它们将如何与来自其他处理器的其他消息交错。因此,您可以轻松获得以下情况:
rank 1 messages to rank 0: [src 1, msg A, tag 1], [src 1, msg B, tag 2]
rank 2 messages to rank 0: [src 2, msg C, tag 1], [src 2, msg D, tag 2]
rank 0 message queue: [src 1, msg A, tag 1], [src 2, msg C, tag 1], [src 2, msg D, tag 2], [src 1, msg B, tag 2]
因此,排名 0 提取带有标签 1 的消息将获得排名 1 的 msg A,但随后使用标签 2 将获得排名 2 的 msg D。(请注意,上面的消息队列满足上面的有序保证,但在这里对我们没有帮助)。
有几种方法可以解决这个问题。一种是newdeltaw
不仅按标签过滤接收到的消息,而且按来源过滤,以确保它来自发送以下内容的同一任务neww
:
if rank == 0:
cb = numpy.zeros(size)
rstat = MPI.Status()
for i in range((size-1)*3):
neww = comm.recv(source=MPI.ANY_SOURCE, tag=1, status=rstat)
src = rstat.Get_source()
newdeltaw = comm.recv(source=src, tag=2)
print "newdelw is",newdeltaw,"neww is",neww
cb[neww]=cb[neww]+newdeltaw
print "cb=",cb
else:
data = rank
for i in range(3):
comm.send(rank,dest=0,tag=1)
comm.send(data,dest=0,tag=2)
这样,只接收到来自匹配源的 tag-2 newdeltaw 消息,避免了不一致。
另一种方法是完全避免拆分消息,将两条数据放入同一条消息中:
if rank == 0:
cb = numpy.zeros(size)
rstat = MPI.Status()
for i in range((size-1)*3):
(neww,newdeltaw) = comm.recv(source=MPI.ANY_SOURCE, tag=1)
print "newdelw is",newdeltaw,"neww is",neww
cb[neww]=cb[neww]+newdeltaw
print "cb=",cb
else:
data = rank
for i in range(3):
comm.send((rank,data),dest=0,tag=1)
这会将两条数据捆绑到一条消息中,因此它们不能分开。(请注意,一旦这工作,您可以使用更有效的低级 mpi4py 例程来避免序列化元组:
if rank == 0:
cb = numpy.zeros(size)
rstat = MPI.Status()
for i in range((size-1)*3):
dataarr = numpy.zeros(2,dtype='i')
comm.Recv([dataarr,MPI.INT],source=MPI.ANY_SOURCE, tag=1)
newdeltaw = dataarr[0]
neww = dataarr[1]
print "newdelw is",newdeltaw,"neww is",neww
cb[neww]=cb[neww]+newdeltaw
print "cb=",cb
else:
data = rank
for i in range(3):
senddata = numpy.array([rank,data],dtype='i')
comm.Send([senddata, MPI.INT],dest=0,tag=1)
最后,您可以完全避免主/从方法,并让所有处理器处理它们在问题中的部分结果,然后在最后将所有结果与 reduce 操作结合起来:
cb = numpy.zeros(size,dtype='i')
totals = numpy.zeros(size,dtype='i')
data = rank
for i in range(3):
cb[rank] = cb[rank] + data
comm.Reduce([cb,MPI.INT], [totals,MPI.INT], op=MPI.SUM, root=0)
if rank == 0:
print "result is", totals