1

我正在运行一个具有 mpirun 和 2 个内核的进程,当我在两个进程之间混合值时,它会被杀死。这两个进程都使用了大约 15% 的机器内存,即使混合时内存会增加,但仍然应该有足够的内存。所以我假设用于在进程之间传递消息的内存量是有限制的。我如何找出这个限制是什么以及如何删除它?

当 mpirun 死亡时,我收到的错误消息是:

File "Comm.pyx", line 864, in mpi4py.MPI.Comm.bcast (src/mpi4py.MPI.c:67787)
File "pickled.pxi", line 564, in mpi4py.MPI.PyMPI_bcast (src/mpi4py.MPI.c:31462)
File "pickled.pxi", line 93, in mpi4py.MPI._p_Pickle.alloc (src/mpi4py.MPI.c:26327)
SystemError: Negative size passed to PyBytes_FromStringAndSize

这是导致错误的代码位:

sum_updates_j_k = numpy.zeros((self.col.J_total, self.K), dtype=numpy.float64))        
comm.Reduce(self.updates_j_k, sum_updates_j_k, op=MPI.SUM) 
sum_updates_j_k = comm.bcast(sum_updates_j_k, root=0) 

该代码通常有效,它只会遇到大量数据的问题,这使得我在进程之间交换的矩阵的大小增加

4

4 回答 4

2

罪魁祸首可能是在代码中发现的以下几行PyMPI_bcast()

cdef int count = 0
...
if dosend: smsg = pickle.dump(obj, &buf, &count)  # <----- (1)
with nogil: CHKERR( MPI_Bcast(&count, 1, MPI_INT, # <----- (2)
                              root, comm) )
cdef object rmsg = None
if dorecv and dosend: rmsg = smsg
elif dorecv: rmsg = pickle.alloc(&buf, count)
...

这里发生的是对象首先在(1)using时序列化,pickle.dump()然后在(2).

这里有两个问题,它们都与int用于长度的事实有关。第一个问题是内部的整数转换pickle.dump,另一个问题是MPI_INT用于传输腌制流的长度。这将矩阵中的数据量限制为一定的大小 - 即导致腌制对象不大于 2 GiB(2 31 -1 字节)的大小。任何更大的对象都会导致整数溢出,从而导致count.

这显然不是 MPI 问题,而是(或?)中的一个错误mpi4py

于 2013-11-18T09:02:55.867 回答
1

我最近对 ​​mpi4py 也有同样的问题。正如 Hristo Iliev 在他的回答中指出的那样,这是一个泡菜问题。

这可以通过使用不使用 pickle的大写方法 comm.Reduce(),comm.Bcast()等来避免,而不是像. 作为奖励,大写方法也应该更快一些。comm.reduce()

实际上,您已经在使用comm.Reduce(),所以我希望切换到comm.Bcast()应该可以解决您的问题 - 它对我有用。

注意:大写方法的语法略有不同,但本教程可以帮助您入门。

例如,而不是:

sum_updates_j_k = comm.bcast(sum_updates_j_k, root=0) 

你会使用:

comm.Bcast(sum_updates_j_k, root=0) 
于 2016-03-08T13:48:19.247 回答
0

显然这是 MPI 本身的问题,而不是 MPI4py 中的问题。保存正在传输的数据大小的实际变量是一个带符号的 32 位整数,对于大约 2GB 的数据,它将溢出为负值。

使用 MPI::Send 可以发送的最大数据量

以前这里也曾作为 MPI4py 的问题提出过。

于 2021-04-15T18:06:57.373 回答
0

numpy对于这种情况,有一个可以部分发送数组的函数很有用,例如:

from mpi4py import MPI
import math, numpy
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
def bcast_array_obj(obj = None, dtype = numpy.float64, root = 0):
    """Function for broadcasting of a numpy array object"""
    reporter = 0 if root > 0 else 1
    if rank == root:
        for exp in range(11):
            parts = pow(2, exp)
            err = False
            part_len = math.ceil(len(obj) / parts)
            for part in range(parts):
                part_begin = part * part_len
                part_end = min((part + 1) * part_len, len(obj))
                try:
                    comm.bcast(obj[part_begin: part_end], root = root)
                except:
                    err = True
                err *= comm.recv(source = reporter, tag = 2)
                if err:
                    break
            if err:
                continue
            comm.bcast(None, root = root)
            print('The array was successfully sent in {} part{}'.\
                  format(parts, 's' if parts > 1 else ''))
            return
        sys.stderr.write('Failed to send the array even in 1024 parts')
        sys.stderr.flush()
    else:
        obj = numpy.zeros(0, dtype = dtype)
        while True:
            err = False
            try:
                part_obj = comm.bcast(root = root)
            except:
                err = True
                obj = numpy.zeros(0, dtype = dtype)
            if rank == reporter:
                comm.send(err, dest = root, tag = 2)
            if err:
                continue
            if type(part_obj) != type(None):
                frags = len(obj)
                obj.resize(frags + len(part_obj))
                obj[frags: ] = part_obj
            else:
                break
        return obj

此函数自动确定打破输入数组的最佳零件数。

例如,

if rank != 0:
    z = bcast_array_obj(root = 0)
else:
    z = numpy.zeros(1000000000, dtype = numpy.float64)
    bcast_array_obj(z, root = 0)

输出

The array was successfully sent in 4 parts
于 2015-09-08T11:51:02.990 回答