1

当使用超过 63 个内核的 python 的 multiprocessing.pool.Pool 时,我得到ValueError

from multiprocessing.pool import Pool

def f(x):
    return x

if __name__ == '__main__':
    with Pool(70) as pool:
        arr = list(range(70))
        a = pool.map(f, arr)
        print(a)

输出:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "C:\Users\fischsam\Anaconda3\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "C:\Users\fischsam\Anaconda3\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\fischsam\Anaconda3\lib\multiprocessing\pool.py", line 519, in _handle_workers
    cls._wait_for_updates(current_sentinels, change_notifier)
  File "C:\Users\fischsam\Anaconda3\lib\multiprocessing\pool.py", line 499, in _wait_for_updates
    wait(sentinels, timeout=timeout)
  File "C:\Users\fischsam\Anaconda3\lib\multiprocessing\connection.py", line 879, in wait
    ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
  File "C:\Users\fischsam\Anaconda3\lib\multiprocessing\connection.py", line 811, in _exhaustive_wait
    res = _winapi.WaitForMultipleObjects(L, False, timeout)
ValueError: need at most 63 handles, got a sequence of length 72
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69]

该程序似乎运行良好;结果正是我所期望的。我可以忽略ValueError吗?

背景:我已经用谷歌搜索了这个问题,它似乎与 Windows 上的 python 限制有关(_winapi.WaitForMultipleObjects);参见例如这里。建议的解决方法是将使用的内核数限制为 63。这并不令人满意,因为我想在我的服务器上使用 100 多个内核。我真的需要限制核心吗?为什么?有解决方法吗?

4

1 回答 1

2

有趣的是,当我PrcocessPoolExecutor(max_workers=70)concurrent.futures模块中查看问题是否仍然存在时,我得到ValueError: max_workers must be <= 61并且程序在提交任何作业之前立即终止。这强烈暗示了一个无法规避的 Windows 限制。但是,您的程序实际上从未终止,只是在收到异常后挂起。在我的 8 核计算机上,如果我指定任何大于60(不是 61 或 63)工人的数量,无论我使用multiprocessing.Pool还是concurrent.futures.ProcessPoolExecutor. 找出你的机器上允许它正常终止而不产生异常并坚持下去的最大工人数。

from concurrent.futures import ProcessPoolExecutor

def f(x):
    return x

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=70) as executor:
        a = list(executor.map(f, range(70)))
        print(a)
于 2020-12-14T14:03:20.327 回答