3

我正在尝试在实际应用程序中使用 asyncio,但这并不容易,非常需要 asyncio 专家的帮助。

在没有淹没事件循环的情况下产生其他任务的任务(成功!)

考虑一个任务,比如从一些“播种”网页开始抓取网络。每个网页都会以指数(!)的方式生成新的下载任务。然而,我们既不想让事件循环泛滥,也不想让我们的网络过载。我们想控制任务流程。这是我通过修改此处提出的漂亮的 Maxime 解决方案而取得的成就: https ://mail.python.org/pipermail/python-list/2014-July/687823.html

映射和减少(失败)

好吧,但如果我们已经在 python3 上,我还需要一个非常自然的东西,一种 map() 和 reduce() 或 functools.reduce()。也就是说,我需要为页面链接上完成的所有下载任务调用“汇总”函数。这是我失败的地方:(

我会提出一个过于简单但仍然很好的测试来模拟用例:让我们以无效的形式使用斐波那契函数实现。也就是说,让 coro_sum() 应用在 reduce() 中,而 coro_fib 就是我们在 map() 中应用的。像这样的东西:

@asyncio.coroutine
def coro_sum(x):
    return sum(x)

@asyncio.coroutine
def coro_fib(x):
    if x < 2:
        return 1
    res_coro =
executor_pool.spawn_task_when_arg_list_of_coros_ready(coro=coro_sum,

 arg_coro_list=[coro_fib(x - 1), coro_fib(x - 2)])
    return res_coro

这样我们就可以运行以下测试。

对一名工人进行测试#1:

executor_pool = ExecutorPool(workers=1)
executor_pool.as_completed( coro_fib(x) for x in range(20) )

对两名工人进行测试#2:

executor_pool = ExecutorPool(workers=2)
executor_pool.as_completed( coro_fib(x) for x in range(20) )

非常重要的是,每个 coro_fib() 和 coro_sum() 调用都是通过某个 worker 上的 Task 完成的,而不仅仅是隐式产生和不受管理的!

找到对这个非常自然的目标感兴趣的 asyncio 大师会很酷。您的帮助和想法将不胜感激。

此致

瓦莱里

4

1 回答 1

1

多种方法可以异步计算斐波那契数列。首先,检查爆炸变体在您的情况下是否失败:

@asyncio.coroutine
def coro_sum(summands):
    return sum(summands)

@asyncio.coroutine
def coro_fib(n):
    if   n == 0: s = 0
    elif n == 1: s = 1
    else:
        summands, _ = yield from asyncio.wait([coro_fib(n-2), coro_fib(n-1)])
        s = yield from coro_sum(f.result() for f in summands)
    return s

您可以替换summands为:

a = yield from coro_fib(n-2) # don't return until its ready
b = yield from coro_fib(n-1)
s = yield from coro_sum([a, b])

一般来说,为了防止指数增长,您可以使用asyncio.Queue通过通信同步),asyncio.Semaphore使用互斥锁同步)原语。

于 2015-01-31T13:37:32.497 回答