我正在使用 I/O 非阻塞 python 服务器 Tornado。我有一类GET
请求可能需要很长时间才能完成(想想在 5-10 秒的范围内)。问题是 Tornado 会阻止这些请求,因此后续的快速请求会被阻止,直到慢速请求完成。
我查看了:https ://github.com/facebook/tornado/wiki/Threading-and-concurrency并得出结论,我想要#3(其他进程)和#4(其他线程)的某种组合。#4 本身就有问题,当有另一个线程在执行“heavy_lifting”时,我无法将可靠的控制权返回给 ioloop。(我认为这是由于 GIL 以及 heavy_lifting 任务具有高 CPU 负载并不断将控制权从主 ioloop 中拉出的事实,但这是一个猜测)。
因此,我一直在设计如何解决这个问题的原型,方法是在单独的进程中在这些缓慢的请求中执行“繁重的”任务GET
,然后在进程完成时将回调放回 Tornado ioloop 以完成请求。这释放了 ioloop 来处理其他请求。
我创建了一个简单的示例来演示一个可能的解决方案,但我很想从社区中获得反馈。
我的问题有两个:如何简化当前的方法?它可能存在哪些陷阱?
该方法
利用 Tornado 的内置
asynchronous
装饰器,它允许请求保持打开状态并让 ioloop 继续。multiprocessing
使用 python 的模块为“繁重”任务生成一个单独的进程。我首先尝试使用该threading
模块,但无法将任何可靠的控制权交还给 ioloop。看来这mutliprocessing
也将利用多核。使用模块在主 ioloop 进程中启动一个“观察者”线程,该
threading
模块的工作是在完成multiprocessing.Queue
时观察“繁重”任务的结果。这是必要的,因为我需要一种方法来知道 heavy_lifting 任务已经完成,同时仍然能够通知 ioloop 这个请求现在已经完成。确保“观察者”线程经常通过调用将控制权交给主 ioloop 循环,
time.sleep(0)
以便继续轻松处理其他请求。当队列中有结果时,然后从“观察者”线程添加一个回调,使用
tornado.ioloop.IOLoop.instance().add_callback()
它被记录为从其他线程调用 ioloop 实例的唯一安全方法。请务必
finish()
在回调中调用以完成请求并提交回复。
下面是一些显示这种方法的示例代码。 multi_tornado.py
是实现上述大纲的服务器,call_multi.py
是一个示例脚本,它以两种不同的方式调用服务器来测试服务器。两个测试都使用 3 个慢速GET
请求和 20 个快速GET
请求调用服务器。结果显示在打开和未打开线程的情况下运行。
在“无线程”运行它的情况下,3 个慢速请求块(每个需要一秒钟多一点的时间才能完成)。20 个快速请求中有几个挤在 ioloop 中的一些慢速请求之间(不完全确定这是如何发生的 - 但可能是我在同一台机器上同时运行服务器和客户端测试脚本的工件)。这里的要点是所有快速请求都在不同程度上受到了阻碍。
在启用线程运行的情况下,20 个快速请求首先立即完成,三个慢速请求在之后大约同时完成,因为它们每个都并行运行。这是期望的行为。三个慢速请求并行完成需要 2.5 秒 - 而在非线程情况下,三个慢速请求总共需要大约 3.5 秒。所以总体上大约有 35% 的加速(我假设是由于多核共享)。但更重要的是 - 快速请求立即以慢速请求的 leu 处理。
我在多线程编程方面没有很多经验 - 所以虽然这似乎在这里有效,但我很想学习:
有没有更简单的方法来实现这一点?这种方法中可能潜伏着什么怪物?
(注意:未来的权衡可能是只运行更多的 Tornado 实例,使用像 nginx 这样的反向代理进行负载平衡。无论我将使用负载平衡器运行多个实例 - 但我担心只是在这个问题上抛出硬件因为在阻塞方面,硬件似乎与问题直接相关。)
示例代码
multi_tornado.py
(示例服务器):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0) # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = 'fast result ' + self.get_argument('id')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note: This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0) # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument('id')
res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish() # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()
call_multi.py
(客户测试员):
import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = 'http://localhost:8888/{}?id={}'
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = 'http://localhost:8888/fast?id={}'
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print 'Scheduling Get Requests:'
print '------------------------'
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print '\nStart sending requests....'
IOLoop.instance().start()
if __name__ == '__main__':
scenario = sys.argv[1]
if scenario == 'slow' or scenario == 'slow_threaded':
run(scenario)
试验结果
通过运行python call_multi.py slow
(阻塞行为):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20
通过运行python call_multi.py slow_threaded
(所需的行为):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s