0

我有工人和任务要做:

workers = ['peter', 'paul', 'mary']
tasks = range(13)

现在我想将任务拆分成块或批次的工作,这样每个工人就可以处理一个批次,并且做的工作量与其他人大致相同。在我的现实生活中,我想将批处理作业安排到计算场。批处理作业应该并行运行。实际的调度和调度是由商业级工具如 lsf 或 grid 完成的。

我期望的一些例子:

>>> distribute_work(['peter', 'paul', 'mary'], range(3))
[('peter', [0]), ('paul', [1]), ('mary', [2])]
>>> distribute_work(['peter', 'paul', 'mary'], range(6))
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2, 5])]
>>> distribute_work(['peter', 'paul', 'mary'], range(5))
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2])]

这个问题与这里这里这里的问题非常相似

不同之处在于我想要这些功能,按顺序或优先顺序:

  1. 尽可能不使用len内部长数据结构
  2. 接受生成器
  3. 返回发生器
  4. 尽可能多地使用 stdlib 组件

关于要求的一些旁注:

  • 没有故意的命令:我有同名的工人可以做多个批次(unix 主机名)。如果您的解决方案使用 dicts,那很好,因为我们总是可以通过批量枚举进行工作人员查找。
  • 任意长度:worker 和 tasks 都可以是任何长度 >= 1 的迭代。并且它们不必像上面示例中所示的平均拆分,其中 Mary 只得到一个任务。
  • 秩序:对我来说并不重要。我猜其他人可能更喜欢 [0,1]、[2,3]、[5] 之类的顺序,但我不在乎。如果您的解决方案可以保持或切换顺序,也许值得向其他人指出。

我试图解决itertools这个特殊的问题,并想出了以下代码来说明这个问题:

from itertools import *

def distribute_work(workers, tasks):
    batches = range(len(workers))
    return [ ( workers[k],
               [t[1] for t in i]
               )   for (k,i) in groupby(sorted(zip(cycle(batches),
                                                   tasks),
                                               key=lambda t: t[0]),
                                        lambda t: t[0]) ]

这满足 4.,但排序很可能违反 1.. 和 2./3。甚至没有想过。

可能有一些简单的解决方案,以我没有想到的方式组合一些 stdlib 组件。但也许不是。有接盘侠吗?

4

5 回答 5

2

我想你想用它multiprocessing.Pool.imap来处理你的工人和分配他们的工作。我相信它会做你想做的一切。

jobs = (some generator)                   # can consume jobs from a generator
pool = multiprocessing.Pool(3)            # set number of workers here
results = pool.imap(process_job, jobs)    # returns a generator

for r in results:                         # loop will block until results arrive
    do_something(r)

如果结果的顺序对您的应用程序无关紧要,您也可以使用imap_unordered.

于 2012-10-31T09:04:16.793 回答
1

一定要预批吗?

为什么不只是有一个队列,并让每个工作人员在完成一个工作单元时退出队列?

于 2012-10-30T17:55:50.453 回答
1

按照泰勒的回答

def doleOut(queue, workers):
    for worker,task in itertools.izip(itertools.cycle(workers),queue):
        yield worker,task

(worker, task)只要有队列,这将继续返回元组。所以如果你有阻塞waitForMoreWork,你可以这样做:

queue = []
doler = distribute_work(workers, queue)
while 1:
    queue.append(waitForMoreWork)
    currentqueuelen = len(queue)
    for i in range(0,queuelen):
        worker,item = doler.next()
        worker.passitem(item)

这样,它将阻塞直到有更多队列项目,然后分发这些项目,然后再次阻塞。您可以设置您的 waitForMoreWork 表达式,以便一次分发尽可能多的项目。

于 2012-10-30T19:43:32.947 回答
0

好的,说了不可能,这里有一个想法。也许这是我应该转向 codereview 的东西 - 我非常有兴趣评论这会在内存中产生多少开销。换句话说,我不知道这是否真的解决了任务列表很长且大小未知的问题。正如Blckknght 提到multiprocessing的可能是更好的选择

编码:

import itertools

def distribute_work(workers, tasks):
    """Return one generator per worker with a fair share of tasks

    Task may be an arbitrary length generator.
    Workers should be an iterable.
    """
    worker_count = len(workers)
    worker_ids = range(worker_count)
    all_tasks_for_all_workers = itertools.tee(tasks, worker_count)
    assignments = [ (workers[id], itertools.islice(i, id, None, worker_count))
                    for (id,i) in enumerate(all_tasks_for_all_workers) ]    
    return(assignments)

该算法是

  1. 为每个工人复制一次原始任务列表。由于这只是复制生成器对象,因此它应该与内存中任务列表的大小无关。即使这是一项相对昂贵的操作,它也只是一次启动成本,对于非常大的任务列表来说在内存中是微不足道的。
  2. 要将任务分配给一个工作人员,每个工作人员必须获取任务列表的一部分。如果#W是工人的数量,第一个工人接任务0, #W, 2*#W,3*#W等。第二个工人接0+1, #W+1, 2*#W+1,3*#W+1等。每个工人的拼接可以用itertools.islice

对于纯粹的任务拆分/分配,此功能并不真正需要工作人员的姓名。但是工人的数量是。改变它会使函数更加通用和有用,并使返回值更容易理解。为了回答我自己的问题,我将保留该功能。

用法和结果:

>>> for (worker,tasks) in distribute_work(['peter', 'paul', 'mary'], range(5)):
...   print(worker, list(tasks))
... 
peter [0, 3]
paul [1, 4]
mary [2]

它还处理工人具有相同名称但不同实体的情况:

>>> for (worker,tasks) in distribute_work(['p', 'p', 'mary'], range(5)): 
...   print(worker, list(tasks))
... 
p [0, 3]
p [1, 4]
mary [2]
于 2012-10-31T10:24:00.617 回答
0

这是我喜欢的一种方法:

parallelism = os.cpu_count()
num_todos = len(todos)

# this zip fanciness makes each chunk stripe through the data sequentially overall so that the
# first items still get done first across all the workers
chunksize = math.ceil(num_todos / parallelism)
chunks = list(itertools.zip_longest(*[todos[i:i+chunksize] for i in range(0, num_todos, chunksize)]))
chunks = [[c for c in chunk if c is not None] for chunk in chunks]

with Pool(processes=parallelism) as pool:
    tasks = [pool.apply_async(my_function, args=(chunk)) for chunk in chunks]
    [task.get() for task in tasks]

根据您是否需要累积结果,您可以进行调整,但对我来说有趣的部分是让工作人员协作以全局顺序完成工作(在我的情况下,处理图像的连续帧,以便我可以看到事情的样子所有的CPU都在启动)。

于 2017-10-12T01:57:32.897 回答