2

我正在尝试运行多个进程,同时使用concurrent.futures.ProcessPoolExecutor来运行 CPU 密集型作业。前几个请求得到了愉快的服务,但随后KeyError从 引发了a concurrent.futures.process,并且服务器挂起。

这是 Tornado 中的错误吗?

这是我剥离代码的最简单形式。

服务器:

# -*- coding: utf-8 -*-
"""
server runs 2 processes and does job on a ProcessPoolExecutor
"""
import tornado.web
import tornado.ioloop
import tornado.gen
import tornado.options
import tornado.httpserver

from concurrent.futures import ProcessPoolExecutor


class MainHandler(tornado.web.RequestHandler):

    executor = ProcessPoolExecutor(1)

    @tornado.gen.coroutine
    def post(self):
        num = int(self.request.body)
        result = yield self.executor.submit(pow, num, 2)
        self.finish(str(result))


application = tornado.web.Application([
    (r"/", MainHandler),
])


def main():
    tornado.options.parse_command_line()
    server = tornado.httpserver.HTTPServer(application)
    server.bind(8888)
    server.start(2)
    tornado.ioloop.IOLoop.instance().start()


if __name__ == '__main__':
    main()

客户:

# -*- coding: utf-8 -*-
"""
client
"""
from tornado.httpclient import AsyncHTTPClient
from tornado.gen import coroutine
from tornado.ioloop import IOLoop


@coroutine
def remote_compute(num):
    rsp = yield AsyncHTTPClient().fetch(
        'http://127.0.0.1:8888', method='POST', body=str(num))
    print 'result:', rsp.body


IOLoop.instance().run_sync(lambda: remote_compute(10))

错误回溯

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.7_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/local/Cellar/python/2.7.7_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/Users/cliffxuan/.virtualenvs/executor/lib/python2.7/site-packages/concurrent/futures/process.py", line 216, in _queue_management_worker
    work_item = pending_work_items[result_item.work_id]
KeyError: 0
4

1 回答 1

7

tornadoconcurrent.futures使用. tornado_ server.start(2)这在内部用于os.fork()创建两个进程。因为您将 声明Executor为类变量,所以在实际运行MainHandler之前执行类本身时,它会被实例化。server.start()这意味着两个进程最终共享一个(尽管是分叉的)ProcessPoolExecutor实例。这导致了一些奇怪的现象——每个进程在Executor.

ProcessPoolExecutor不支持像这样在进程之间共享,所以当第二个进程尝试使用Executor. 您可以通过仅在发生Executor 创建以下方法来解决它:fork

class MainHandler(tornado.web.RequestHandler):
    executor = None # None for now

    @tornado.gen.coroutine
    def post(self):
        num = int(self.request.body)
        result = yield self.executor.submit(pow, num, 2)
        self.finish(str(result))


application = tornado.web.Application([
    (r"/", MainHandler),
])


def main():
    tornado.options.parse_command_line()
    server = tornado.httpserver.HTTPServer(application)
    server.bind(8889)
    server.start(2) # We fork here
    MainHandler.executor = ProcessPoolExecutor(1) # Now we can create the Executor
    tornado.ioloop.IOLoop.instance().start()
于 2014-10-14T21:27:54.380 回答