3

我对龙卷风很陌生。只是在看看如何处理在 Tornado 中阻塞的请求。我在单独的线程中运行阻塞代码。然而,主线程仍然阻塞,直到线程函数完成。我没有在这里使用 gen.coroutine,但是已经尝试过了,结果是一样的

counter = 0

def run_async(func):
    @wraps(func)
    def function_in_a_thread(*args, **kwargs):
        func_t = Thread(target=func, args=args, kwargs=kwargs)
        func_t.start()
    return function_in_a_thread

def long_blocking_function(index, sleep_time, callback):
    print "Entering run counter:%s" % (index,)
    time.sleep(sleep_time)
    print "Exiting run counter:%s" % (index,)

    callback('keyy' + index)


class FooHandler(tornado.web.RequestHandler):

    @web.asynchronous
    def get(self):

        global counter
        counter += 1
        current_counter = str(counter)

        print "ABOUT to spawn thread for counter:%s" % (current_counter,)
        long_blocking_function(
            index=current_counter,
            sleep_time=5, callback=self.done_waiting)


        print "DONE with the long function"

    def done_waiting(self, response):
        self.write("Whatever %s " % (response,))
        self.finish()


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [(r"/foo", FooHandler),
                    ]

        settings = dict(
            debug=True,
        )

        tornado.web.Application.__init__(self, handlers, **settings)


def main():
    application = Application()
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main()

当我背靠背发出请求时,FooHandler 会阻塞并且在 long_blocking_function 完成之前不会收到任何请求。所以我最终看到了类似的东西

ABOUT to spawn thread for counter:1
Entering run counter:1
Exiting run counter:1
DONE with the long function
ABOUT to spawn thread for counter:2
Entering run counter:2
Exiting run counter:2
DONE with the long function
ABOUT to spawn thread for counter:3
Entering run counter:3
Exiting run counter:3
DONE with the long function

我期待着这些方面的东西(因为我在第一次调用 long_blocking_function 完成之前发出多个请求)但我只看到类似于上面的跟踪

ABOUT to spawn thread for counter:1
DONE with the long function
ABOUT to spawn thread for counter:2
DONE with the long function
ABOUT to spawn thread for counter:3
DONE with the long function
ABOUT to spawn thread for counter:4
DONE with the long function

我查看了Tornado 阻止异步请求并尝试了这两种解决方案。但是当我使用对同一个处理程序的背靠背请求运行它们时,它们都被阻塞了。有人能弄清楚我做错了什么吗?我知道龙卷风在多线程方面表现不佳,但我应该能够以非阻塞方式从中运行一个新线程。

4

3 回答 3

5

Tornado 与concurrent.futures库(有可用的Python 2.x 反向端口)配合得很好,因此您可以使用ThreadPoolExecutor将长时间运行的请求移交给线程池。

这种技术效果很好——我们用它来处理长时间运行的数据库操作。当然,在现实世界中,您还希望以稳健和优雅的方式处理超时和其他异常,但我希望这个示例足以说明这个想法。

def long_blocking_function(index, sleep_time, callback):
    print ("Entering run counter:%s" % (index,))
    time.sleep(sleep_time)
    print ("Exiting run counter:%s" % (index,))
    return "Result from %d" % index


class FooHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        global counter
        counter += 1
        current_counter = str(counter)

        print ("ABOUT to spawn thread for counter:%s" % (current_counter,))
        result = yield self.executor.submit(long_blocking_function,
                                            index=current_counter,
                                            sleep_time=5)
        self.write(result)
        print ("DONE with the long function")
于 2013-10-15T12:48:50.567 回答
1

当您将任务卸载到另一个线程时,您会在任务完成后调用的事件循环中注册一个回调。答案在这个线程中:How to make a library asynchronous in python

问候米。

于 2013-10-14T14:11:21.633 回答
0

您只是忘记实际使用run_async您定义的装饰器。

import tornado.web
import functools
import threading
import time

counter = 0
def run_async(func):
    @functools.wraps(func)
    def function_in_a_thread(*args, **kwargs):
        func_t = threading.Thread(target=func, args=args, kwargs=kwargs)
        func_t.start()
    return function_in_a_thread


@run_async
def long_blocking_function(index, sleep_time, callback):
    print ("Entering run counter:%s" % (index,))
    time.sleep(sleep_time)
    print ("Exiting run counter:%s" % (index,))
    callback('keyy' + index)

class FooHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        global counter
        counter += 1
        current_counter = str(counter)

        print ("ABOUT to spawn thread for counter:%s" % (current_counter,))
        long_blocking_function(
            index=current_counter,
            sleep_time=5, callback=self.done_waiting)
        print ("DONE with the long function")

    def done_waiting(self, response):
        self.write("Whatever %s " % (response,))
        self.finish()


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [(r"/foo", FooHandler),
                    ]

        settings = dict(
            debug=True,
        )

        tornado.web.Application.__init__(self, handlers, **settings)


def main():
    application = Application()
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main()
于 2013-10-11T23:06:27.173 回答