0

我试图让 Tornado 在超时后停止而不阻止任何现有功能。我可能错过了 Tornado 约定,但无论我使用 spawn_callback、Task 还是 Thread,我似乎都在阻止主循环。

首先,我这样做的原因是因为我想在客户端应用程序中使用世界著名的 NATS 消息总线来发布消息(不是通常的直接 HTTP 功能),然后等待订阅的响应。异步行为的典型问题,官方 NATS Python 客户端使用 Tornado,所以我也在尝试使用它。

我怀疑我的问题与理解 tornado.gen.coroutine 装饰器如何与线程一起工作有关。

我的代码片段如下。如果有人注意到我的明显问题,我将不胜感激。谢谢!

class Delayed(Thread):
   def __init__(self, callback=None, timeout=2, *args, **kwargs):
        super(Delayed, self).__init__(*args, **kwargs)
        self.callback = callback
        self.timeout = timeout

   def run(self):
        time.sleep(self.timeout)
        if self.callback != None:
            self.callback()

def timeout_task(timeout_secs=2):
    time.sleep(timeout_secs)
    ioloop.IOLoop.instance().stop()
    yield

@tornado.gen.coroutine
def main():
    parser = argparse.ArgumentParser()

    parser.add_argument('CommandType')
    ...
    parser.add_argument('-s', '--servers', default=[], action='append')
    args = parser.parse_args()

    try:
      timeout=args.wait

      servers = args.servers
      queue = ""
      ...
      if len(args.servers) < 1:
          servers = ["nats://127.0.0.1:4222"]

      data = funct_to_get_data()

      nc = NATS()
      opts = { "servers": servers }
      yield nc.connect(**opts)

      def self_stop():
          ioloop.IOLoop.instance().stop()

      def handler(msg):
          print("[Received: {0}] {1}".format(msg.subject, msg.data))

      print("Subscribed to '{0}'".format(subject))
      future = nc.subscribe(subject, queue, handler)
      sid = future.result()

      yield nc.publish(subject, data)
      yield nc.flush()
      print("Published to '{0}'".format(subject))

      # HERE is where I'd like to setup a non-blocking timeout that
      # will stop Tornado.

      # spawn_callback blocks and prevents future from receiving anything.
      #ioloop.IOLoop.current().spawn_callback(lambda: timeout_task(timeout))

      # Task blocks and prevents future from receiving anything. 
      yield tornado.gen.Task(timeout_task, timeout)

      # Straight attempt at a Thread will block as well.
      Delayed(self_stop(), timeout).start()
    except Exception, e:
      print(e)
      show_usage_and_die()

if __name__ == '__main__':
    main()
    ioloop.IOLoop.instance().start()
4

1 回答 1

0

好的,我最终只是使用 threading.Timer Python 支持来延迟函数调用。这是重要的代码。它可能不是很好的 Python 或 Tornado 的使用,但我现在就使用它。

def self_stop():
    ioloop.IOLoop.instance().stop()

@tornado.gen.coroutine
def main():
    parser = argparse.ArgumentParser()
    ...
    parser.add_argument('-w', '--wait', default=2, type=int)
    ...
    args = parser.parse_args()

    try:
      timeout=args.wait
      t = Timer(timeout, self_stop)
      t.start()
      ...


if __name__ == '__main__':
    main()
    ioloop.IOLoop.instance().start()
于 2017-07-13T14:27:14.280 回答