25

我正在尝试使用多处理的池来运行一组进程,每个进程都将运行一个 gevent 的 greenlets 池。这样做的原因是有很多网络活动,但也有很多 CPU 活动,所以为了最大化我的带宽和我所有的 CPU 内核,我需要多个进程和 gevent 的异步猴子补丁。我正在使用多处理的管理器创建一个队列,进程将访问该队列以获取要处理的数据。

这是代码的简化片段:

import multiprocessing

from gevent import monkey
monkey.patch_all(thread=False)

manager = multiprocessing.Manager()
q = manager.Queue()

这是它产生的异常:

Traceback (most recent call last):
  File "multimonkeytest.py", line 7, in <module>
    q = manager.Queue()
  File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 667, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 565, in _create
    conn = self._Client(self._address, authkey=self._authkey)
  File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 175, in Client
    answer_challenge(c, authkey)
  File "/usr/local/Cellar/python/2.7.2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 409, in answer_challenge
    message = connection.recv_bytes(256)         # reject large message
 IOError: [Errno 35] Resource temporarily unavailable

我相信这一定是由于普通 socket 模块和 gevent 的 socket 模块的行为之间存在一些差异。

如果我在子进程中进行monkeypatch,则队列创建成功,但是当子进程尝试从队列中获取()时,会发生非常相似的异常。由于在子进程中执行大量网络请求,因此确实需要对套接字进行修补。

我的 gevent 版本,我认为是最新的:

>>> gevent.version_info
(1, 0, 0, 'alpha', 3)

有任何想法吗?

4

5 回答 5

18

利用monkey.patch_all(thread=False, socket=False)

我在类似的情况下遇到了同样的问题,并将其追踪到函数gevent/monkey.py下的第 115 行patch_socket()_socket.socket = socket.socket. 注释掉这条线可以防止损坏。

这是 geventsocket用它自己的库替换 stdlib 库的地方。multiprocessing.connection相当广泛地使用该socket库,并且显然不能容忍这种变化。

具体来说,在您导入的模块执行gevent.monkey.patch_all()调用而不设置socket=False. 就我而言,就是grequests这样做的,我不得不覆盖套接字模块的补丁来修复这个错误。

于 2013-02-06T18:50:08.823 回答
9

不幸的是,在 gevent 的上下文中应用多处理会引发问题。但是,您的理由是合理的(“大量网络活动,但也有大量 CPU 活动”)。如果您愿意,请查看http://gehrcke.de/gipc。这主要是为您的用例设计的。使用 gipc,您可以轻松地生成一些完全感知 gevent 的子进程,并让它们通过管道相互协作和/或与父进程协作。

如果您有具体问题,欢迎回复我。

于 2013-02-11T20:06:17.917 回答
2

If you will use original Queue, then you code will work normally even with monkey patched socket.

import multiprocessing

from gevent import monkey
monkey.patch_all(thread=False)

q= multiprocessing.Queue()
于 2013-10-20T08:22:03.913 回答
1

写了一个替代的 Nose Multiprocess 插件——这个插件应该可以很好地与各种基于 Gevent 的疯狂补丁配合使用。

https://pypi.python.org/pypi/nose-gevented-multiprocess/

https://github.com/dvdotsenko/nose_gevent_multiprocess

  • 从工作进程切换multiprocess.fork到普通subprocess.popen进程(为我修复模块级错误共享对象问题)
  • 从 multiprocess.Queue 切换到基于 HTTP 的 JSON-RPC,用于主到客户端 RPC
  • 现在理论上可以允许将测试分布到多台机器上
于 2014-01-06T10:00:25.813 回答
1

您提供的代码适用于我在 Windows 7 上。

编辑:

删除了之前的答案,因为我已经在 Ubuntu 11.10 VPS 上尝试过你的代码,但我遇到了同样的错误。

貌似Eventlet也有这个问题

于 2011-12-30T13:21:55.510 回答