想象一下,我有一个包含 10 个工人和 40 个核心的 dask 网格。这是一个共享网格,所以我不想让我的工作完全饱和。我有 1000 个任务要做,我想一次提交(并积极运行)最多 20 个任务。
具体来说,
from time import sleep
from random import random
def inc(x):
from random import random
sleep(random() * 2)
return x + 1
def double(x):
from random import random
sleep(random())
return 2 * x
>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>
如果我设置一个队列系统
>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)
这会起作用,但是,这只会将我的所有任务转储到网格中,使其饱和。理想情况下,我可以:
e.scatter(input_q, max_submit=20)
似乎这里文档中的示例允许我使用maxsize
队列。但从用户的角度来看,我仍然需要处理背压。理想情况下 dask
会自动处理这个问题。