9

我正在使用多处理池在 Python 中运行并行模拟,它在具有多核的计算机中运行良好。现在我想在使用多个节点的集群上执行程序。我想多处理不能应用于分布式内存。但 mpi4py 似乎是一个不错的选择。那么这些代码最简单的 mpi4py 等价物是什么:

from multiprocessing import Pool

pool = Pool(processes=16)

pool.map(functionName,parameters_list)
4

3 回答 3

4

这里实现了一个MPIPool类。

有关我如何使用它的示例,请查看 GitHub 上的此要点

于 2014-08-19T15:38:16.603 回答
4

我的一个旧包是建立在它之上的,mpi4py它可以为MPI工作提供功能性并行映射。它不是为了速度而构建的——它是为了实现MPI从解释器到计算集群的并行映射(即不需要从命令行运行mpiexec)。本质上:

>>> from pyina.launchers import MpiPool, MpiScatter
>>> pool = MpiPool()
>>> jobs = MpiScatter()
>>> def squared(x):
...   return x**2
... 
>>> pool.map(squared, range(4))
[0, 1, 4, 9]
>>> jobs.map(sqaured, range(4))
[0, 1, 4, 9]

炫耀“工人池”策略和将工作分配给工人的“分散聚集”策略。当然,我不会将它用于这么小的工作,squared因为生成世界的开销MPI非常高(比设置 a 高得多multiprocessing Pool)。但是,如果您有一项大工作要运行,就像您通常使用 运行在集群上一样MPI,那么pyina这对您来说可能是一个很大的好处。

但是,使用的真正优势pyina在于它不仅可以使用 生成作业MPI,而且可以将作业生成到调度程序。 pyina理解和抽象多个调度程序的启动语法。

pyina使用调度程序对地图的典型调用如下所示:

>>> # instantiate and configure a scheduler
>>> from pyina.schedulers import Torque
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> torque = Torque(**config)
>>> 
>>> # instantiate and configure a worker pool
>>> from pyina.launchers import Mpi
>>> pool = Mpi(scheduler=torque)
>>>
>>> # do a blocking map on the chosen function
>>> pool.map(pow, [1,2,3,4], [5,6,7,8])
[1, 64, 2187, 65536]

几种常见的配置可用作预配置的地图。以下与上面的示例相同:

>>> # instantiate and configure a pre-configured worker pool
>>> from pyina.launchers import TorqueMpiPool
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> pool = TorqueMpiPool(**config)
>>>
>>> # do a blocking map on the chosen function
>>> pool.map(pow, [1,2,3,4], [5,6,7,8])
[1, 64, 2187, 65536]

pyina需要一些 TLC,因为它仍然存在python2.7,并且几年没有发布......但它一直保持最新(在 github 上),并且能够“完成工作”为我运行大型工作 -在过去 10 年中扩展了计算集群——尤其是在与pathos(提供隧道和地图ssh的统一接口)结合使用时。 还没有利用共享内存,但确实很好地完成了令人尴尬的功能并行计算。与调度程序的交互总体上非常好,但对于一些失败的情况可能会有点粗糙——而且非阻塞映射需要大量的工作。话虽如此,multiprocessingParallelPythonpyinaMPI

在此处获取pyina(和pathos):https ://github.com/uqfoundation

于 2016-01-05T18:33:42.363 回答
0

我使用下面的代码相当于multiprocessing.Pool。它还没有经过广泛的测试,但它似乎工作得很好:

from functools import partial
function = partial(...)  # Store all fixed parameters this way if needed

if use_MPI:
    arguments = range(num_runs)
    run_data = None

    # mpi4py
    comm = MPI.COMM_SELF.Spawn(sys.executable, args=['MPI_slave.py'], maxprocs=num_runs)  # Init
    comm.bcast(function, root=MPI.ROOT)     # Equal for all processes
    comm.scatter(arguments, root=MPI.ROOT)  # Different for each process
    comm.Barrier()                          # Wait for everything to finish...
    run_data = comm.gather(run_data, root=MPI.ROOT)  # And gather everything up
else:        
    # multiprocessing
    p = Pool(multiprocessing.cpu_count())
    run_data = p.map(function, range(num_runs))

然后使用一个单独的文件“MPI_slave.py”:

from mpi4py import MPI
# import the function you actually pass to this file here!!!
comm = MPI.COMM_SELF.Get_parent()
size = comm.Get_size()
rank = comm.Get_rank()

def runSlaveRun():
    function = None
    options = None
    # print("Process {}/{} reporting for duty!".format(rank, size))

    function = comm.bcast(function, root=0)
    arguments = comm.scatter(options, root=0)
    results = function(arguments)
    comm.Barrier()
    comm.gather(results, root=0)
    comm.Disconnect()

if __name__ == '__main__':
    runSlaveRun()
于 2016-01-05T10:07:03.480 回答