14

我正在将asyncio用于网络框架。

在下面的代码中(low_level是我们的低级函数,main块是我们的程序入口,user_func是用户定义的函数):

import asyncio

loop = asyncio.get_event_loop()
""":type :asyncio.AbstractEventLoop"""


def low_level():
    yield from asyncio.sleep(2)


def user_func():
    yield from low_level()


if __name__ == '__main__':
    co = user_func()
    loop.run_until_complete(co)

我想包装low_level正常功能而不是coroutine(forcompatibility等),但low_level在事件循环中。如何将其包装为普通功能?

4

1 回答 1

20

因为low_level是协程,所以只能通过运行asyncio事件循环来使用。如果您希望能够从未运行事件循环的同步代码中调用它,则必须提供一个实际启动事件循环并运行协程直到完成的包装器:

def sync_low_level():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(low_level())

如果您希望能够从作为正在运行的事件循环low_level()一部分的函数调用,让它阻塞两秒钟,但不必使用,答案是您不能。事件循环是单线程的;每当执行在您的一个函数内时,事件循环就会被阻止。不能处理其他事件或回调。在事件循环中运行的函数将控制权交还给事件循环的唯一方法是 1) 2) 使用. 除非您做这两件事之一,否则呼叫将永远无法完成。yield fromreturnyield fromasyncio.sleeplow_level

现在,我想您可以创建一个全新的事件循环,并使用它从作为默认事件循环的一部分运行的协程同步运行睡眠:

import asyncio

loop = asyncio.get_event_loop()

@asyncio.coroutine
def low_level(loop=None):
    yield from asyncio.sleep(2, loop=loop)


def sync_low_level():
    new_loop = asyncio.new_event_loop()
    new_loop.run_until_complete(low_level(loop=new_loop))

@asyncio.coroutine
def user_func():
    sync_low_level()

if __name__ == "__main__":
    loop.run_until_complete(user_func())

但我真的不确定你为什么要这样做。

如果您只是希望能够low_level像返回 a 的方法一样操作Future,那么您可以将回调等附加到它,只需将其包装在asyncio.async()

loop = asyncio.get_event_loop()

def sleep_done(fut):
    print("Done sleeping")
    loop.stop()

@asyncio.coroutine
def low_level(loop=None):
    yield from asyncio.sleep(2, loop=loop)

def user_func():
    fut = asyncio.async(low_level())
    fut.add_done_callback(sleep_done)

if __name__ == "__main__":
    loop.call_soon(user_func)
    loop.run_forever()

输出:

<2 second delay>
"Done sleeping"

此外,在您的示例代码中,您应该对 and 使用装饰@asyncio.coroutine器,如文档中所述:low_leveluser_funcasyncio

协程是遵循某些约定的生成器。出于文档目的,所有协程都应使用@asyncio.coroutine 进行修饰,但这不能严格执行。

编辑:

以下是来自同步 Web 框架的用户如何在不阻塞其他请求的情况下调用您的应用程序:

@asyncio.coroutine
def low_level(loop=None):
    yield from asyncio.sleep(2, loop=loop)

def thr_low_level():
   loop = asyncio.new_event_loop()
   t = threading.Thread(target=loop.run_until_complete, args(low_level(loop=loop),))
   t.start()
   t.join()

如果 Flask 调用正在处理的请求thr_low_level,它将阻塞直到请求完成,但是应该为所有正在进行的异步 I/O 释放 GIL low_level,允许在单独的线程中处理其他请求。

于 2014-08-14T04:28:28.380 回答