4

我有一个带有两个处理程序的简单 aiohttp-server。async for第一个在循环中进行一些计算。第二个只是返回文本响应。not_so_long_operation返回具有最慢递归实现的第 30 个斐波那契数,大约需要一秒钟。

def not_so_long_operation():
    return fib(30)

class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            return i
        else:
            raise StopAsyncIteration

# GET /
async def index(request):
    print('request!')
    l = []
    async for i in arange(20):
        print(i)
        l.append(not_so_long_operation())

    return aiohttp.web.Response(text='%d\n' % l[0])

# GET /lol/
async def lol(request):
    print('request!')
    return aiohttp.web.Response(text='just respond\n')

当我尝试获取/then/lol/时,它只会在第一个完成时给我第二个响应。
我在做什么错以及如何让索引处理程序在每次迭代时释放 ioloop?

4

3 回答 3

4

您的示例没有用于在任务之间切换的屈服点(语句)。await异步迭代器允许在/await内部使用,但不要将其自动插入到您的代码中。__aiter____anext__

说,

class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            await asyncio.sleep(0)  # insert yield point
            return i
        else:
            raise StopAsyncIteration

应该按您的预期工作。

在实际应用程序中,您很可能不需要await asyncio.sleep(0)调用,因为您将等待数据库访问和类似活动。

于 2015-11-05T08:39:17.877 回答
3

由于fib(30)受 CPU 限制并共享少量数据,因此您可能应该使用 a ProcessPoolExecutor(而不是 a ThreadPoolExecutor):

async def index(request):
    loop = request.app.loop
    executor = request.app["executor"]
    result = await loop.run_in_executor(executor, fib, 30)
    return web.Response(text="%d" % result)

executor创建时设置app

app = Application(...)
app["exector"] = ProcessPoolExector()
于 2015-11-04T23:40:43.627 回答
2

这里并不真正需要异步迭代器。相反,您可以简单地将控制权交还给循环内的事件循环。在 python 3.4 中,这是通过使用简单的yield

@asyncio.coroutine
def index(self):
    for i in range(20):
        not_so_long_operation()
        yield

在 python 3.5 中,您可以定义一个Empty基本上做同样事情的对象:

class Empty:
    def __await__(self):
        yield

然后将其与await语法一起使用:

async def index(request):
    for i in range(20):
        not_so_long_operation()
        await Empty()

或者干脆使用最近优化过的asyncio.sleep(0)

async def index(request):
    for i in range(20):
        not_so_long_operation()
        await asyncio.sleep(0)

您还可以使用默认执行程序not_so_long_operation在线程中运行:

async def index(request, loop):
    for i in range(20):
        await loop.run_in_executor(None, not_so_long_operation)
于 2015-11-04T13:41:51.660 回答