你可以做的比这更简单。这是一个向子进程提交四个慢速函数调用并等待它们的协程:
from concurrent.futures import ProcessPoolExecutor
from time import sleep
from tornado import gen, ioloop
pool = ProcessPoolExecutor()
def calculate_slowly(x):
sleep(x)
return x
async def parallel_tasks():
# Create futures in a randomized order.
futures = [gen.convert_yielded(pool.submit(calculate_slowly, i))
for i in [1, 3, 2, 4]]
wait_iterator = gen.WaitIterator(*futures)
while not wait_iterator.done():
try:
result = await wait_iterator.next()
except Exception as e:
print("Error {} from {}".format(e, wait_iterator.current_future))
else:
print("Result {} received from future number {}".format(
result, wait_iterator.current_index))
ioloop.IOLoop.current().run_sync(parallel_tasks)
它输出:
Result 1 received from future number 0
Result 2 received from future number 2
Result 3 received from future number 1
Result 4 received from future number 3
你可以看到协程按照它们完成的顺序接收结果,而不是它们提交的顺序:future number 1 在future number 2 之后解析,因为future number 1 睡得更久。convert_yielded 将 ProcessPoolExecutor 返回的 Futures 转换为 Tornado 兼容的 Futures,可以在协程中等待。
每个future 解析为calculate_slowly 返回的值:在这种情况下,它与传递给calculate_slowly 的数字相同,并且与calculate_slowly 休眠的秒数相同。
要将其包含在 RequestHandler 中,请尝试以下操作:
class MainHandler(web.RequestHandler):
async def get(self):
self.write("Starting....\n")
self.flush()
futures = [gen.convert_yielded(pool.submit(calculate_slowly, i))
for i in [1, 3, 2, 4]]
wait_iterator = gen.WaitIterator(*futures)
while not wait_iterator.done():
result = await wait_iterator.next()
self.write("Result {} received from future number {}\n".format(
result, wait_iterator.current_index))
self.flush()
if __name__ == "__main__":
application = web.Application([
(r"/", MainHandler),
])
application.listen(8888)
ioloop.IOLoop.instance().start()
您可以观察curl localhost:8888
服务器是否以增量方式响应客户端请求。