我正在使用多处理池在 Python 中运行并行模拟,它在具有多核的计算机中运行良好。现在我想在使用多个节点的集群上执行程序。我想多处理不能应用于分布式内存。但 mpi4py 似乎是一个不错的选择。那么这些代码最简单的 mpi4py 等价物是什么:
from multiprocessing import Pool
pool = Pool(processes=16)
pool.map(functionName,parameters_list)
我正在使用多处理池在 Python 中运行并行模拟,它在具有多核的计算机中运行良好。现在我想在使用多个节点的集群上执行程序。我想多处理不能应用于分布式内存。但 mpi4py 似乎是一个不错的选择。那么这些代码最简单的 mpi4py 等价物是什么:
from multiprocessing import Pool
pool = Pool(processes=16)
pool.map(functionName,parameters_list)
我的一个旧包是建立在它之上的,mpi4py
它可以为MPI
工作提供功能性并行映射。它不是为了速度而构建的——它是为了实现MPI
从解释器到计算集群的并行映射(即不需要从命令行运行mpiexec
)。本质上:
>>> from pyina.launchers import MpiPool, MpiScatter
>>> pool = MpiPool()
>>> jobs = MpiScatter()
>>> def squared(x):
... return x**2
...
>>> pool.map(squared, range(4))
[0, 1, 4, 9]
>>> jobs.map(sqaured, range(4))
[0, 1, 4, 9]
炫耀“工人池”策略和将工作分配给工人的“分散聚集”策略。当然,我不会将它用于这么小的工作,squared
因为生成世界的开销MPI
非常高(比设置 a 高得多multiprocessing
Pool
)。但是,如果您有一项大工作要运行,就像您通常使用 运行在集群上一样MPI
,那么pyina
这对您来说可能是一个很大的好处。
但是,使用的真正优势pyina
在于它不仅可以使用 生成作业MPI
,而且可以将作业生成到调度程序。 pyina
理解和抽象多个调度程序的启动语法。
pyina
使用调度程序对地图的典型调用如下所示:
>>> # instantiate and configure a scheduler
>>> from pyina.schedulers import Torque
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> torque = Torque(**config)
>>>
>>> # instantiate and configure a worker pool
>>> from pyina.launchers import Mpi
>>> pool = Mpi(scheduler=torque)
>>>
>>> # do a blocking map on the chosen function
>>> pool.map(pow, [1,2,3,4], [5,6,7,8])
[1, 64, 2187, 65536]
几种常见的配置可用作预配置的地图。以下与上面的示例相同:
>>> # instantiate and configure a pre-configured worker pool
>>> from pyina.launchers import TorqueMpiPool
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> pool = TorqueMpiPool(**config)
>>>
>>> # do a blocking map on the chosen function
>>> pool.map(pow, [1,2,3,4], [5,6,7,8])
[1, 64, 2187, 65536]
pyina
需要一些 TLC,因为它仍然存在python2.7
,并且几年没有发布......但它一直保持最新(在 github 上),并且能够“完成工作”为我运行大型工作 -在过去 10 年中扩展了计算集群——尤其是在与pathos
(提供隧道和地图ssh
的统一接口)结合使用时。 还没有利用共享内存,但确实很好地完成了令人尴尬的功能并行计算。与调度程序的交互总体上非常好,但对于一些失败的情况可能会有点粗糙——而且非阻塞映射需要大量的工作。话虽如此,multiprocessing
ParallelPython
pyina
MPI
在此处获取pyina
(和pathos
):https ://github.com/uqfoundation
我使用下面的代码相当于multiprocessing.Pool。它还没有经过广泛的测试,但它似乎工作得很好:
from functools import partial
function = partial(...) # Store all fixed parameters this way if needed
if use_MPI:
arguments = range(num_runs)
run_data = None
# mpi4py
comm = MPI.COMM_SELF.Spawn(sys.executable, args=['MPI_slave.py'], maxprocs=num_runs) # Init
comm.bcast(function, root=MPI.ROOT) # Equal for all processes
comm.scatter(arguments, root=MPI.ROOT) # Different for each process
comm.Barrier() # Wait for everything to finish...
run_data = comm.gather(run_data, root=MPI.ROOT) # And gather everything up
else:
# multiprocessing
p = Pool(multiprocessing.cpu_count())
run_data = p.map(function, range(num_runs))
然后使用一个单独的文件“MPI_slave.py”:
from mpi4py import MPI
# import the function you actually pass to this file here!!!
comm = MPI.COMM_SELF.Get_parent()
size = comm.Get_size()
rank = comm.Get_rank()
def runSlaveRun():
function = None
options = None
# print("Process {}/{} reporting for duty!".format(rank, size))
function = comm.bcast(function, root=0)
arguments = comm.scatter(options, root=0)
results = function(arguments)
comm.Barrier()
comm.gather(results, root=0)
comm.Disconnect()
if __name__ == '__main__':
runSlaveRun()