对于我的 Tornado 服务器中预期会阻塞的操作(并且不能轻易修改以使用 Tornado 的异步 HTTP 请求客户端之类的东西),我一直在使用该multiprocessing
模块将工作卸载到单独的工作进程。具体来说,我使用的是多处理Pool
,因为它提供了一个名为 的方法apply_async
,该方法与 Tornado 配合得非常好,因为它将回调作为其参数之一。
我最近意识到一个池预先分配了进程的数量,所以如果它们都变成阻塞的,需要一个新进程的操作将不得不等待。我确实意识到服务器仍然可以apply_async
通过将东西添加到任务队列来进行连接,并且它本身会立即完成,但我希望为我需要执行的n个阻塞任务生成n 个进程。
我想我可以add_handler
为我的 Tornado 服务器的 IOLoop 使用该方法来为我创建的每个新 PID 添加一个处理程序到该 IOLoop。我以前做过类似的事情,但它使用了 popen 和任意命令。使用这种方法的一个例子是here。不过,我想将参数传递给我范围内的任意目标 Python 函数,所以我想坚持使用multiprocessing
.
但是,似乎有些东西不喜欢我的multiprocessing.Process
对象所具有的 PID。我明白了IOError: [Errno 9] Bad file descriptor
。这些过程是否受到某种限制?我知道 PID 在我实际启动进程之前不可用,但我确实启动了进程。这是我制作的演示此问题的示例的源代码:
#!/usr/bin/env python
"""Creates a small Tornado program to demonstrate asynchronous programming.
Specifically, this demonstrates using the multiprocessing module."""
import tornado.httpserver
import tornado.ioloop
import tornado.web
import multiprocessing as mp
import random
import time
__author__ = 'Brian McFadden'
__email__ = 'brimcfadden@gmail.com'
def sleepy(queue):
"""Pushes a string to the queue after sleeping for 5 seconds.
This sleeping can be thought of as a blocking operation."""
time.sleep(5)
queue.put("Now I'm awake.")
return
def random_num():
"""Returns a string containing a random number.
This function can be used by handlers to receive text for writing which
facilitates noticing change on the webpage when it is refreshed."""
n = random.random()
return "<br />Here is a random number to show change: {0}".format(n)
class SyncHandler(tornado.web.RequestHandler):
"""Demonstrates handing a request synchronously.
It executes sleepy() before writing some more text and a random number to
the webpage. While the process is sleeping, the Tornado server cannot
handle any requests at all."""
def get(self):
q = mp.Queue()
sleepy(q)
val = q.get()
self.write(val)
self.write('<br />Brought to you by SyncHandler.')
self.write('<br />Try refreshing me and then the main page.')
self.write(random_num())
class AsyncHandler(tornado.web.RequestHandler):
"""Demonstrates handing a request asynchronously.
It executes sleepy() before writing some more text and a random number to
the webpage. It passes the sleeping function off to another process using
the multiprocessing module in order to handle more requests concurrently to
the sleeping, which is like a blocking operation."""
@tornado.web.asynchronous
def get(self):
"""Handles the original GET request (normal function delegation).
Instead of directly invoking sleepy(), it passes a reference to the
function to the multiprocessing pool."""
# Create an interprocess data structure, a queue.
q = mp.Queue()
# Create a process for the sleepy function. Provide the queue.
p = mp.Process(target=sleepy, args=(q,))
# Start it, but don't use p.join(); that would block us.
p.start()
# Add our callback function to the IOLoop. The async_callback wrapper
# makes sure that Tornado sends an HTTP 500 error to the client if an
# uncaught exception occurs in the callback.
iol = tornado.ioloop.IOLoop.instance()
print "p.pid:", p.pid
iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
def _finish(self, q):
"""This is the callback for post-sleepy() request handling.
Operation of this function occurs in the original process."""
val = q.get()
self.write(val)
self.write('<br />Brought to you by AsyncHandler.')
self.write('<br />Try refreshing me and then the main page.')
self.write(random_num())
# Asynchronous handling must be manually finished.
self.finish()
class MainHandler(tornado.web.RequestHandler):
"""Returns a string and a random number.
Try to access this page in one window immediately after (<5 seconds of)
accessing /async or /sync in another window to see the difference between
them. Asynchronously performing the sleepy() function won't make the client
wait for data from this handler, but synchronously doing so will!"""
def get(self):
self.write('This is just responding to a simple request.')
self.write('<br />Try refreshing me after one of the other pages.')
self.write(random_num())
if __name__ == '__main__':
# Create an application using the above handlers.
application = tornado.web.Application([
(r"/", MainHandler),
(r"/sync", SyncHandler),
(r"/async", AsyncHandler),
])
# Create a single-process Tornado server from the application.
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8888)
print 'The HTTP server is listening on port 8888.'
tornado.ioloop.IOLoop.instance().start()
这是回溯:
Traceback (most recent call last):
File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context
yield
File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext
yield
File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute
getattr(self, self.request.method.lower())(*args, **kwargs)
File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper
return method(self, *args, **kwargs)
File "./process_async.py", line 73, in get
iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler
self._impl.register(fd, events | self.ERROR)
IOError: [Errno 9] Bad file descriptor
上面的代码实际上是从一个使用进程池的旧示例修改而来的。我已经保存了很长一段时间以供我的同事和我自己参考(因此评论量很大)。我以这样的方式构建它,以便我可以并排打开两个小的浏览器窗口,向我的老板展示 /sync URI 阻止连接,而 /async 允许更多连接。就这个问题而言,重现它所需要做的就是尝试访问 /async 处理程序。它立即出错。
我该怎么办?PID怎么可能是“坏的”?如果您运行该程序,您可以看到它被打印到标准输出。
作为记录,我在 Ubuntu 10.04 上使用 Python 2.6.5。龙卷风是 1.1。