2

我正在执行以下代码并且它工作正常,但它不会产生到不同的进程,而是有时都在同一个进程中运行,有时是一种进程中的二合一。我正在使用4 cpu机器。这段代码有什么问题?

def f(values):
    print(multiprocessing.current_process())
    return values

def main():
    p = Pool(4) #number of processes = number of CPUs
    keys, values= zip(*data.items()) #ordered keys and values
    processed_values= p.map( f, values )
    result= dict( zip(keys, processed_values ) ) 
    p.close() # no more tasks
    p.join()  # wrap up current tasks

结果是

<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>

有时像这样,

<SpawnProcess(SpawnPoolWorker-3, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-3, started daemon)>

有时,

<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-4, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>

我的问题是,它在什么基础上将功能分配给工人?我正在编写代码,它根据我的字典中的键数来决定进程的数量(考虑到我的数据总是比我的 CPU 有更少的键)。我的代码将开始 - 主代码读取文件并使用单个进程从中制作字典,并将其分支到并发进程的数量并等待它们处理数据(我正在使用 pool.map ),然后一旦获得子进程的结果,它就会开始处理它们。我怎样才能实现这个父等待子进程步骤?

4

1 回答 1

5

你的代码没有问题。您的工作项目非常快 - 如此之快,以至于同一个工作进程有可能运行该函数,返回结果,然后赢得竞争,从multiprocessing.Pool用于分配工作的内部队列中消费下一个任务。当您调用map时,工作项被分成批次并放入一个Queue. 这是pool.map将可迭代项中的项目分块并将它们放入队列的部分实现:

    task_batches = Pool._get_tasks(func, iterable, chunksize)
    result = MapResult(self._cache, chunksize, len(iterable), callback)
    self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 
                          for i, x in enumerate(task_batches)), None))

每个工作进程运行一个函数,该函数具有一个无限循环,该循环使用该队列中的项目*:

while maxtasks is None or (maxtasks and completed < maxtasks):
    try:
        task = get()  # Pulls an item off the taskqueue
    except (EOFError, IOError):
        debug('worker got EOFError or IOError -- exiting')
        break

    if task is None:
        debug('worker got sentinel -- exiting')
        break

    job, i, func, args, kwds = task
    try:
        result = (True, func(*args, **kwds))  # Runs the function you passed to map
    except Exception, e:
        result = (False, e)
    try:
        put((job, i, result))  # Sends the result back to the parent
    except Exception as e:
        wrapped = MaybeEncodingError(e, result[1])
        debug("Possible encoding error while sending result: %s" % (
            wrapped))

很可能同一个工人只是偶然能够消费一个项目,运行func,然后消费下一个项目。这有点奇怪 - 我无法在运行与您的示例相同的代码的机器上重现它 - 但是让同一个工作人员从队列中抓取四个项目中的两个是很正常的

如果你让你的工作函数花费更长的时间,你应该总是看到均匀分布,通过插入调用time.sleep

def f(values):
    print(multiprocessing.current_process())
    time.sleep(1)
    return values

* 这实际上并不完全正确 - 有一个线程在主进程中运行,它消耗 from taskqueue,然后将它拉出的内容粘贴到另一个Queue中,就是子进程消耗的内容)

于 2014-09-18T00:30:29.163 回答