我试图让 Tornado 在超时后停止而不阻止任何现有功能。我可能错过了 Tornado 约定,但无论我使用 spawn_callback、Task 还是 Thread,我似乎都在阻止主循环。
首先,我这样做的原因是因为我想在客户端应用程序中使用世界著名的 NATS 消息总线来发布消息(不是通常的直接 HTTP 功能),然后等待订阅的响应。异步行为的典型问题,官方 NATS Python 客户端使用 Tornado,所以我也在尝试使用它。
我怀疑我的问题与理解 tornado.gen.coroutine 装饰器如何与线程一起工作有关。
我的代码片段如下。如果有人注意到我的明显问题,我将不胜感激。谢谢!
class Delayed(Thread):
def __init__(self, callback=None, timeout=2, *args, **kwargs):
super(Delayed, self).__init__(*args, **kwargs)
self.callback = callback
self.timeout = timeout
def run(self):
time.sleep(self.timeout)
if self.callback != None:
self.callback()
def timeout_task(timeout_secs=2):
time.sleep(timeout_secs)
ioloop.IOLoop.instance().stop()
yield
@tornado.gen.coroutine
def main():
parser = argparse.ArgumentParser()
parser.add_argument('CommandType')
...
parser.add_argument('-s', '--servers', default=[], action='append')
args = parser.parse_args()
try:
timeout=args.wait
servers = args.servers
queue = ""
...
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]
data = funct_to_get_data()
nc = NATS()
opts = { "servers": servers }
yield nc.connect(**opts)
def self_stop():
ioloop.IOLoop.instance().stop()
def handler(msg):
print("[Received: {0}] {1}".format(msg.subject, msg.data))
print("Subscribed to '{0}'".format(subject))
future = nc.subscribe(subject, queue, handler)
sid = future.result()
yield nc.publish(subject, data)
yield nc.flush()
print("Published to '{0}'".format(subject))
# HERE is where I'd like to setup a non-blocking timeout that
# will stop Tornado.
# spawn_callback blocks and prevents future from receiving anything.
#ioloop.IOLoop.current().spawn_callback(lambda: timeout_task(timeout))
# Task blocks and prevents future from receiving anything.
yield tornado.gen.Task(timeout_task, timeout)
# Straight attempt at a Thread will block as well.
Delayed(self_stop(), timeout).start()
except Exception, e:
print(e)
show_usage_and_die()
if __name__ == '__main__':
main()
ioloop.IOLoop.instance().start()