2

我的设置是 python tornado 服务器,它使用ThreadPoolExecutor. 在某些情况下,任务可能会变成无限循环。使用with_timeout装饰器,我设法捕获了超时异常并将错误结果返回给客户端。问题是任务仍在后台运行。如何阻止任务在 中运行ThreadPoolExecutor?或者可以取消Future吗?这是重现问题的代码。使用 tornado 4 和 concurrent.futures 库运行代码并转到http://localhost:8888/test

from tornado.concurrent import run_on_executor
from tornado.gen import with_timeout
from tornado.ioloop import IOLoop
import tornado.web
from tornado import gen
from concurrent.futures import ThreadPoolExecutor
import datetime
MAX_WAIT_SECONDS = 10

class MainHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    @run_on_executor
    def test_func(self):
        ...
        #infinite loop might be here
        ...

    @tornado.gen.coroutine
    def get(self):
        future = self.test_func()
        try:
            result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
            self.write({'status' : 0})
            self.finish()
        except Exception, e:
            #how to cancel the task here if it was timeout
            future.cancel() # <-- Does not work
            self.write({'status' : 100})
            self.finish()

application = tornado.web.Application([
    (r"/test", MainHandler),
])
application.listen(8888)
IOLoop.instance().start()
4

1 回答 1

2

Future实例本身一旦实际执行就不能被取消,只有在它们处于挂起状态时才能被取消。这在文档中有所说明:

取消()

尝试取消通话。如果调用当前正在执行并且无法取消,则该方法将返回False,否则该调用将被取消并且该方法将返回True

因此,中止您在后台运行的方法的唯一方法是将逻辑实际插入到您的潜在无限循环中,以便在您告诉它时可以中止它。在您的示例中,您可以使用threading.Event

class MainHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    @run_on_executor
    def test_func(self, event):
        i = 0
        while not event.is_set():
            print i
            i = i + 1

    @tornado.gen.coroutine
    def get(self):
        event = threading.Event()
        future = self.test_func(event)
        try:
            result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
            self.write({'status' : 0})
            self.finish()
        except Exception, e:
            future.cancel() # Might not work, depending on how busy the Executor is
            event.set()
            self.write({'status' : 100})
            self.finish()

application = tornado.web.Application([
    (r"/test", MainHandler),
])
于 2015-03-10T15:22:00.703 回答