19

当使用以下代码在 python 中使用 multiprocessing.Pool 时,会出现一些奇怪的行为。

from multiprocessing import Pool
p = Pool(3)
def f(x): return x
threads = [p.apply_async(f, [i]) for i in range(20)]
for t in threads:
    try: print(t.get(timeout=1))
    except Exception: pass

我收到以下错误三次(池中的每个线程一个),它打印“3”到“19”:

AttributeError: 'module' object has no attribute 'f'

前三个 apply_async 调用永远不会返回。

同时,如果我尝试:

from multiprocessing import Pool
p = Pool(3)
def f(x): print(x)
p.map(f, range(20))

我得到 AttributeError 3 次,shell 打印“6”到“19”,然后挂起并且无法被 [Ctrl] + [C] 杀死

多处理文档有以下内容:

此包中的功能要求子模块可以导入主模块。

这是什么意思?

澄清一下,我在终端中运行代码来测试功能,但最终我希望能够将它放入 Web 服务器的模块中。如何在 python 终端和代码模块中正确使用 multiprocessing.Pool?

4

3 回答 3

39

警告:多处理是在 Django 和 Flask 等 Web 服务器的上下文中使用的错误工具。相反,您应该使用像Celery这样的任务框架或像Elastic Beanstalk Worker Environments这样的基础设施解决方案。使用多处理来生成线程或进程是不好的,因为它不会让您监督或管理这些线程/进程,因此您必须构建自己的故障检测逻辑、重试逻辑等。此时,您最好使用一个现成的工具,实际上是为处理异步任务而设计的,因为它会给你这些开箱即用的东西。


了解文档

包中的功能要求子模块可以导入主模块。

这意味着必须在定义要在其上运行的函数之后初始化池。如果您正在编写一个独立的脚本,那么在块中使用池if __name__ == "__main__":是可行的,但这在更大的代码库或服务器代码(例如 Django 或 Flask 项目)中是不可能的。因此,如果您尝试在其中之一中使用池,请确保遵循以下指南,这些指南将在以下部分中进行说明:

  1. 尽可能在函数内部初始化池。如果您必须在全局范围内初始化它们,请在模块底部进行。
  2. 不要在全局范围内调用 Pool 的方法。

或者,如果您只需要更好的 I/O 并行性(例如数据库访问或网络调用),您可以省去所有这些麻烦,并使用线程池而不是进程池。这涉及完全无证的:

from multiprocessing.pool import ThreadPool

它的接口与 Pool 的接口完全相同,但由于它使用线程而不是进程,因此它没有使用进程池所做的任何警告,唯一的缺点是您无法获得真正的代码执行并行性,只是阻塞 I/O 的并行性。


必须在定义要在其上运行的函数之后初始化池

python 文档中难以理解的文本意味着在定义池时,池中的线程会导入周围的模块。在 python 终端的情况下,这意味着您到目前为止运行的所有且唯一的代码。

因此,您想在池中使用的任何函数都必须在初始化池之前定义。模块中的代码和终端中的代码都是如此。问题中代码的以下修改将正常工作:

from multiprocessing import Pool
def f(x): return x  # FIRST
p = Pool(3) # SECOND
threads = [p.apply_async(f, [i]) for i in range(20)]
for t in threads:
    try: print(t.get(timeout=1))
    except Exception: pass

或者

from multiprocessing import Pool
def f(x): print(x)  # FIRST
p = Pool(3) # SECOND
p.map(f, range(20))

很好,我的意思是在 Unix 上很好。Windows 有它自己的问题,我不在这里讨论。


在模块中使用池

但是等等,还有更多(在您想在其他地方导入的模块中使用池)!

如果在函数内定义池,则没有问题。 但是如果你使用 Pool 对象作为模块中的全局变量,它必须定义在页面的底部,而不是顶部。尽管这与大多数优秀的代码风格背道而驰,但它对于功能来说是必要的。使用在页面顶部声明的池的方法是仅将其与从其他模块导入的函数一起使用,如下所示:

from multiprocessing import Pool
from other_module import f
p = Pool(3)
p.map(f, range(20))

从另一个模块导入预配置的池是非常可怕的,因为导入必须在您想要在其上运行的任何内容之后进行,如下所示:

### module.py ###
from multiprocessing import Pool
POOL = Pool(5)

### module2.py ###
def f(x):
    # Some function
from module import POOL
POOL.map(f, range(10))

其次,如果您在要导入的模块的全局范围内的池中运行任何东西,系统就会挂起。即这不起作用

### module.py ###
from multiprocessing import Pool
def f(x): return x
p = Pool(1)
print(p.map(f, range(5)))

### module2.py ###
import module

但是,这确实有效,只要没有导入模块 2:

### module.py ###
from multiprocessing import Pool

def f(x): return x
p = Pool(1)
def run_pool(): print(p.map(f, range(5)))

### module2.py ###
import module
module.run_pool()

现在,这背后的原因更加奇怪,并且可能与问题中的代码每次只吐出一次属性错误以及之后似乎正确执行代码的原因有关。似乎池线程(至少具有一定的可靠性)在执行后重新加载模块中的代码。

于 2013-09-23T15:10:36.243 回答
4

在创建线程池时必须已经定义了要在线程池上执行的函数。

这应该有效:

from multiprocessing import Pool
def f(x): print(x)
if __name__ == '__main__':
    p = Pool(3)
    p.map(f, range(20))

原因是(至少在具有 的系统上fork)当您创建池时,工作人员是通过分叉当前进程来创建的。因此,如果此时尚未定义目标函数,则工作人员将无法调用它。

在 windows 上有点不同,因为 windows 没有fork. 在这里启动新的工作进程并导入主模块。这就是为什么在 Windows 上使用if __name__ == '__main__'. 否则,每个新工作人员都会重新执行代码,因此会无限生成新进程,从而使程序(或系统)崩溃。

于 2013-09-22T19:36:51.023 回答
0

此错误还有另一个可能的来源。运行示例代码时出现此错误。

消息来源是,尽管正确安装了 multiprosessing,但我的系统上没有安装 C++ 编译器,pip 在尝试更新 multiprocessing 时通知了我。所以检查编译器是否安装可能是值得的。

于 2016-11-30T14:06:18.443 回答