1

我正在使用 mpi4py 在 Python 中运行 MPI 代码,如下所示:

from mpi4py import MPI
import numpy as np
import os

comm = MPI.COMM_WORLD
rank = comm.Get_Rank()
size = comm.Get_Size()

if rank == 0:
  res = np.zeros(2**16)
  jobs = os.listdir('/my/data/dir')
  for i in xrange(len(jobs)):
    proc = (i % (size - 1)) + 1 #lacks load balancing
    buf = load_buf_from_file(job[i])
    #root waits here at 100%
    comm.Send([buf, dtype], dest = proc) #lacks load balancing
    comm.Recv([res, dtype], source = MPI.ANY_SOURCE)
    save_result_to_file(res)
else:
  buf = np.zeros(2**16)
  comm.Recv([buf, dtype], source = 0)
  res = do_lots_of_work(buf)
  comm.Send([res, dtype], dest = 0)

我注意到根进程总是很忙(CPU 为 100%)。我更喜欢根进程休眠,直到工作进程准备好接收下一条消息。MPI 编程中有哪些促进这种行为的模式?也许根进程也应该工作?

这种设计的另一个缺陷如下......如果worker proc 4 在3 之前完成,那么4 必须等待3 完成,然后才能从root 获取新消息以继续工作。有关如何设计始终尝试将下一条消息发送到空闲进程的根进程的任何建议?这对我来说基本没问题,因为接收消息的第一个进程通常是第一个完成的进程。但是,如果每条消息的工作负载都发生变化,则情况并非总是如此。

谢谢,凯文

4

4 回答 4

3

对于您的第一个问题,关于 Comm.Recv 中排名 0 时的 CPU 使用情况。那是一个执行问题。MPICH(可能还有许多其他人)在紧密的轮询循环中等待事件,以最大限度地减少延迟。

你的第二个问题:如果工作单位不规律,如何平衡工作量。答案是非阻塞操作。(Isend、Irecv 等)。

一种可能的工作流程可能是这样的:

  • rank 0 有一个工作单元队列
  • 排名 0 向每个客户端发布非阻塞发送
  • 当客户端想要工作时,它从服务器接收并发送回就绪消息
  • 服务器获取就绪消息并发送一个工作单元。
  • 服务器还为最终的“我完成了”消息发出非阻塞接收。
  • 当任何客户端完成时,它会发出“我完成了,给我更多”消息
  • 服务器发送队列中的下一个工作单元。
于 2014-09-15T13:39:52.837 回答
1

也许使用一个等级作为服务器来分配作业最适合负载平衡:

#!/usr/bin/env python
import mpi4py

import numpy as np
import os
import time

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
root = 0

if rank == root:
  for i in range(50):
    d = comm.recv(source = MPI.ANY_SOURCE)
    comm.send(i, dest = d)
  for i in range(size-1):
    d = comm.recv(source = MPI.ANY_SOURCE)
    comm.send(-1, dest = d)
    print('Closing', d)
else:
    while True:
        comm.send(rank, root)
        job = comm.recv(source = root)
        if job < 0: break

        print('Rank {} Job {}'.format(rank, job))
        time.sleep(np.random.random()%10)
于 2016-02-10T07:43:11.753 回答
0

我刚刚做了一些与你的回答非常相似的事情,但我可以提供几个替代方案。我还发布了我的代码的简化版本

首先,您可以通过侦听任何工作进程的第一个响应来使其更具响应性。MPI.ANY_SOURCE为此目的有一个特殊的源值。使用状态对象来确定消息来自哪个实际来源。

# Set this to 0 for maximum responsiveness, but that will peg CPU to 100%
sleep_seconds = 0.1
if sleep_seconds > 0:
    while not comm.Iprobe(source=MPI.ANY_SOURCE):
        time.sleep(sleep_seconds)

status = MPI.Status()
result = comm.recv(source=MPI.ANY_SOURCE, status=status)
logging.info('Received %r from rank %d', result, status.Get_source())

我做了一些搜索,发现如果您不与其他任务共享您的处理器,那么您想要的是忙碌的等待。在这种情况下,您只需在我的代码段中将 sleep_seconds 设置为 0 或直接调用recv(). 我们有时不得不分享我们的环境,所以我要去投票。

Jörg Bornschein 的 mpi4py-examples 包括Task Pull 示例,该示例将不同长度的任务分配给一组工人。我认为,用我上面的代码片段代替他的电话recv()会给你一个很好的解决方案。

我的一些工作人员启动了多线程任务,因此我不想过度订阅我的处理器。我发布了一个多线程要点,它知道每个任务将启动多少个线程,并为额外的线程留出额外的工作人员。例如,如果一个任务要在四个线程上运行,master 将等待,直到同一主机上有四个 worker 可用,然后它将任务传递给其中一个 worker,让其他三个空闲,直到任务完成.

于 2015-01-28T00:11:04.357 回答
0

我通过在从根发送之前向 MPI 例程添加更多逻辑来解决这个问题:

if i > size - 1:
  #probe for response, and send next message to proc that responds first
  #sleep 1 second between probing different procs
  r = 1
  while not comm.Iprobe(source = r):
    time.sleep(1)
    r = (r % (size - 1)) + 1
  res = comm.Recv([res, dtype], source = r)
  proc = r
else:
  #initialize the sends in serial (proc 1, ..., size-1)
  proc = i + 1

在 MPI 中还有其他方法可以做到这一点吗?

于 2014-09-14T21:58:05.910 回答