12

我有一个long_task运行繁重的 cpu-bound 计算的函数,我想通过使用新的 asyncio 框架使其异步。生成的long_task_async函数使用 aProcessPoolExecutor将工作卸载到不受 GIL 约束的不同进程。

问题在于,由于某种原因,当 yield from 时concurrent.futures.Future返回的实例ProcessPoolExecutor.submit会抛出一个TypeError. 这是设计使然吗?asyncio.Future那些期货与阶级不兼容吗?什么是解决方法?

我还注意到生成器不可腌制,因此向 couroutine 提交ProcessPoolExecutor会失败。有什么干净的解决方案吗?

import asyncio
from concurrent.futures import ProcessPoolExecutor

@asyncio.coroutine
def long_task():
    yield from asyncio.sleep(4)
    return "completed"

@asyncio.coroutine
def long_task_async():
    with ProcessPoolExecutor(1) as ex:
        return (yield from ex.submit(long_task)) #TypeError: 'Future' object is not iterable
                                                 # long_task is a generator, can't be pickled

loop = asyncio.get_event_loop()

@asyncio.coroutine
def main():
    n = yield from long_task_async()
    print( n )

loop.run_until_complete(main())
4

2 回答 2

12

您想使用loop.run_in_executor,它使用concurrent.futures执行程序,但将返回值映射到asyncio未来。

最初的asyncioPEP建议 concurrent.futures.Future有朝一日可能会发展出一种__iter__方法,因此它也可以使用yield from,但目前该库已被设计为只需要yield from支持,仅此而已。(否则某些代码实际上在 3.3 中无法工作。)

于 2014-03-17T00:39:27.880 回答
12

我们可以通过调用将 a 包装concurrent.futures.Future成a 。我用下面的代码试过了。工作正常asyncio.futureasyncio.wrap_future(Future)

from asyncio import coroutine
import asyncio
from concurrent import futures


def do_something():
    ls = []
    for i in range(1, 1000000):
        if i % 133333 == 0:
            ls.append(i)
    return ls


@coroutine
def method():
    with futures.ProcessPoolExecutor(max_workers=10) as executor:
        job = executor.submit(do_something)
        return (yield from asyncio.wrap_future(job))

@coroutine
def call_method():
    result = yield from method()
    print(result)


def main():
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(call_method())
    finally:
        loop.close()


if __name__ == '__main__':
    main()
于 2015-07-25T06:02:22.773 回答