4

这将是一个很长的问题,所以:

TL;DR:我有一个带有请求处理程序的 Python 2.7 线程网络服务器,调用堆栈如下所示:

WorkerThread -> requestHandler -> func1 -> func2 -> .. -> func10 -> doStuff -> BlockingIO

我想使用 Tornado 3.0 IOLoop 并只更改服务器和 IO 部分:

(IOLoop) -> requestHandler -> func1 -> func2 -> .. -> func10 -> (doStuff) -> (AsyncIO)

所以 requestHandler() 和 func10() 之间的所有代码栈都不会改变。事实上,即使doStuff() 的接口也不会改变,它会显得阻塞。但是,在内部,它将使用一个 AsyncIO 对象(作为 Tornado 协程),并且在异步 IO 操作期间让给 IOLoop 以执行其他协程,直到 IO 操作完成。

这可能吗?




现在来看一个几乎真实的例子:

我有一个网络服务器,它接收请求并使用线程池(或进程池,就本示例而言无关紧要)处理它们:

def main():

    # Main entry point, called below.

    # Fake class, you can imagine the internals. We register a request 
    # handler here - handleRequest()
    server = ThreadedServer(handler=handleRequest) 

    # Server has a thread pool, each request is handled on a worker thread. 
    # One thread handles network stuff and pushes requests to worker threads
    Server.start()

def handleRequest(server_address):

    # This is the request handler, called in the context of a worker 
    # thread, after a network request was received.

    # We call the function below. It blocks the thread until it finishes.
    # Not very optimal, since the blocking is network IO bound
    result = doStuff(server_address)

    # We use the result somehow, here we print it
    print "Request handled with result: %s" % result

def doStuff(server_address):

    # This is called by the request handler

    # This is a network bound object, most of its time is spent waiting
    # for the network IO
    net_bound_object = NetBoundSyncObject(server_address)

    # This would block, waiting on the network, preventing the thread from 
    # handling other requests
    result = net_bound_object.do_something()

    # We have the result, return it
    return result

if __name__ == "__main__":

    main()

很简单,真的。

现在,假设我决定要重构我的服务器以使用 Tornado,使用 tornado.gen 来支持异步操作,因此不会受到网络 IO 的限制。所以,这是我的新代码:

def main():

    # Start Tornado's IOLoop, first entering TornadoServer.start() to begin
    # initializing the server and accept requests.
    # server.start is a coroutine that waits for network IO, yielding 
    # control back to the IOLoop until something
    # happens. When something does, it is awakened and schedules a 
    # request handler - handleRequest, and goes back to network IO, 
    # yielding control. Thus, handleRequest is called.
    server = TornadoServer(handler=handleRequest) # fake class again
    IOLoop.instance().add_callback(server.start)
    IOLoop.instance().start()

def handleRequest(server_address):

    # This part of the code has not been changed - just the comments.
    # It is now run in the context of an IOLoop callback.

    # We call the function above. The interface remains the same. It also seems
    # to block - which is fine, we want to wait for its result to continue processing.
    # However, we want the IOLoop to continue running somehow.
    result = doStuff(server_address)

    # We use the result somehow, here we print it
    print "Request handled with result: %s" % result            

def doStuff(server_address):

    # This is a network bound object, most of its time is spent waiting for
    # the network IO, however all its methods are coroutines and it yields 
    # while waiting for network IO
    net_bound_object = NetBoundAsyncObject(server_address)

    # Now to the problem.
    # doStuff() is a facade - I don't want it to be a coroutine, I want it to hide
    # the implementation details and keep its previous interface.

    # However, NetBoundAsyncObject.do_something_async() is a coroutine, and calls
    # coroutines inside it. So it should be called in the context of
    # another coroutine:
    result = yield net_bound_object.do_something_async()
    # but this is wrong here, since we are not a coroutine.

    # To properly call it asynchronously, I would need to make doStuff()
    # a coroutine as well, breaking its interface, which would mean that 
    # handleRequest too should now be a coroutine. Not a big change, but imagine
    # that instead of calling doStuff() directly, I had code like:
    # handleRequest -> func1 -> func2 -> func3 -> ... -> func10 -> doStuff
    # so now I'd have to change all these functions to be coroutines as well.

    # All of these functions, handleRequest and func1..10, represent a big stack 
    # of code in my real system which is completely synchronous, CPU bound code, 
    # so it has no IO waits anywhere, just code that needs to be run BEFORE and
    # AFTER the network IO bound code finishes, to properly handle the request. 
    # It is well tested, production proven code that requires no functional change,
    # and that doesn't need to be a coroutine. This would be a big refactor.       

    # In the code as it is now, result is now returned as a Future:
    result = net_bound_object.do_something_async()
    # I want to be able to do something like:
    IOLoop.instance().wait_for_future(result)
    # Letting the IOLoop run and handle other things in the meanwhile, like
    # network requests, and also my asynchronous code. 
    # When it finishes, I want my wait_for_future() to return and to continue
    # execution with the result accessible in the future object.

    # Thus, the changes would be at the top (the TornadoServer vs ThreadedServer)
    # and the bottom (doStuff to use either NetBoundObject or NetBoundAsyncObject),
    # but the middle stack will remain unchanged.

    # Return the result of the operation
    return result

if __name__ == "__main__":

    main()

我知道这在很多方面都是有问题的,主要是因为调用堆栈。当我们做类似的事情时:

IOLoop.instance().wait_for_future(result)

我们有一个如下所示的调用堆栈:

IOLoop.main_loop.start() -> handleRequest -> IOLoop.main_loop.wait_for_future() -> other_callbacks..

所以我们可能(甚至可能)遇到如下情况:

IOLoop.main_loop.start() -> handleRequest -> IOLoop.main_loop.wait_for_future() -> handleRequest -> IOLoop.main_loop.wait_for_future() -> handleRequest -> IOLoop.main_loop.wait_for_future() -> ...

显然,如果handleRequest 本身成为协程,那么当它本身产生时,我们就没有这么深的堆栈问题。

在我曾经使用过的嵌入式系统中,使用非抢占式调度程序,在任何时候都没有问题将控制权返回给调度程序而不会出现堆栈问题。调度程序将获取执行上下文并调用堆栈并存储它们,然后更改为另一个上下文/堆栈并从那里继续执行。在等待事件/IO 时,调度程序将被触发并运行 IO 循环中的任何内容。我想要在我的系统中使用这样的东西,而不是必须更改上面的整个调用堆栈 - 将所有东西都转换为协程。

有没有人有任何提示,任何想法?

4

1 回答 1

4

您可以使用以下方法同步运行 @gen.coroutine 修饰函数:

@gen.coroutine
def main():
    # do stuff...

if __name__ == '__main__':
    IOLoop.instance().run_sync(main)

这将启动“IOLoop”、运行函数并停止循环。 https://github.com/facebook/tornado/blob/master/tornado/ioloop.py

于 2013-11-18T03:09:50.743 回答