2

我正在尝试使用cherrypy + multiprocessing(启动工作进程'进程')+ gevent(从工作进程'进程'内启动并行i/o greenlets)的组合。似乎最简单的方法是monkeypatch multiprocessing,因为greenlets只能在主应用程序进程中运行。

但是,看起来猴子修补程序适用于多处理的某些部分,而不适用于其他部分。这是我的示例 CherryPy 服务器:

from gevent import monkey
monkey.patch_all()

import gevent
import cherrypy
import multiprocessing

def launch_testfuncs():
    jobs = [gevent.spawn(testfunc)
            for i in range(0, 12)]

    gevent.joinall(jobs, timeout=10)

def testfunc():
    print 'testing'

class HelloWorld(object):
    def index(self):
        launch_testfuncs()

        return "Hello World!"
    index.exposed = True

    def index_proc(self):
        proc = multiprocessing.Process(target=launch_testfuncs)
        proc.start()
        proc.join()

        return "Hello World 2!"
    index_proc.exposed = True

    def index_pool(self):
        pool = multiprocessing.Pool(1)
        return "Hello World 3!"
    index_pool.exposed = True

    def index_namespace(self):
        manager = multiprocessing.Manager()
        anamespace = manager.Namespace()
        anamespace.val = 23
        return "Hello World 4!"
    index_namespace.exposed = True


cherrypy.quickstart(HelloWorld())

猴子修补后的以下工作:

  • index- 直接从cherrypy类中产生greenlets
  • index_proc- 用于multiprocessing.Process启动一个新进程,然后从该进程中生成 greenlets

以下有问题:

  • index_pool- 启动multiprocessing.Pool-挂起并且永不返回
  • index_namespace- 初始化multiprocessing.Manager命名空间以管理池/工人集合中的共享内存 -返回以下错误消息:

    [15/Nov/2012:17:19:31] HTTP Traceback (most recent call last):
      File "/Library/Python/2.7/site-packages/cherrypy/_cprequest.py", line 656, in respond
    response.body = self.handler()
      File "/Library/Python/2.7/site-packages/cherrypy/lib/encoding.py", line 188, in __call__
    self.body = self.oldhandler(*args, **kwargs)
      File "/Library/Python/2.7/site-packages/cherrypy/_cpdispatch.py", line 34, in __call__
    return self.callable(*self.args, **self.kwargs)
      File "server.py", line 39, in index_namespace
    anamespace = manager.Namespace()
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 667, in temp
    token, exp = self._create(typeid, *args, **kwds)
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 565, in _create
    conn = self._Client(self._address, authkey=self._authkey)
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 175, in Client
    answer_challenge(c, authkey)
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 414, in answer_challenge
    response = connection.recv_bytes(256)        # reject large message
    IOError: [Errno 35] Resource temporarily unavailable
    

我尝试在 gevent docs 中找到一些与此相关的文档,但找不到与此相关的任何内容。只是gevent的猴子补丁不完整吗?有没有其他人有类似的问题,有没有办法解决它?

4

2 回答 2

1

这个问题似乎是gevent.socket非阻塞的结果,这意味着如果字节在套接字上没有立即可用,任何socket.recv_bytes(X)调用都会抛出该错误。X具体来说,它gevent.socket的设计永远不会阻塞套接字。

问题的multiprocessing出现是因为它使用 stdlibsocket模块并期望它被阻塞,而在你monkey.patch_all()'d 之后,socket模块已被替换,并且multiprocessing.connection不是为处理新的异步行为而设计的。

您可以告诉monkey不要修补socket,但这意味着在您的应用程序中利用异步套接字的任何东西都可能因此而导致一些性能损失。

patch_all使用socket=False:进行此调用patch_all(socket=False)

这不是一个理想的解决方案,因为您几乎失去了从gevent一开始就可以使用的大部分好处。

于 2013-02-06T19:52:43.943 回答
1

我遇到了和你一样的问题。我的解决方案是我刚刚使用 multiprocessing.Process() 来生成固定数量的进程。最后全部加入,等待完成。

#!/usr/bin/env python
# encoding: utf-8

from gevent import monkey
monkey.patch_all()

import gevent
import multiprocessing as mp


NUM = 10


def work(i):
    jobs = [gevent.spawn(func, i)
            for i in range(0, 12)]
    gevent.joinall(jobs)
    print "{} Done {}".format(mp.current_process().name, i)


def func(x):
    print "Gevent: {}".format(x)

def main():

    processes = [mp.Process(name="Process-{}".format(i), target=work, args=(i,)) for i in xrange(NUM)]

    for process in processes:
        process.start()

    for process in processes:
        process.join()


if __name__ == '__main__':
    main()

输出

Gevent: 0
Gevent: 1
Gevent: 2
Gevent: 3
Gevent: 4
Gevent: 5
Gevent: 6
Gevent: 7
Gevent: 8
Gevent: 9
Gevent: 10
Gevent: 11
Process-0 Done 11
Gevent: 0
Gevent: 1
Gevent: 2
Gevent: 3
Gevent: 4
Gevent: 5
Gevent: 6
Gevent: 7
Gevent: 8
Gevent: 9
Gevent: 10
Gevent: 11
Process-1 Done 11
Gevent: 0
Gevent: 1
Gevent: 2
Gevent: 3
Gevent: 4
Gevent: 5
Gevent: 6
Gevent: 7
Gevent: 8
Gevent: 9
Gevent: 10
Gevent: 11
Process-2 Done 11
Gevent: 0
... ...

所以这是 gevent 与多处理一起工作的解决方案。

于 2015-12-09T12:07:08.983 回答