12

想象一下,我有一个包含 10 个工人和 40 个核心的 dask 网格。这是一个共享网格,所以我不想让我的工作完全饱和。我有 1000 个任务要做,我想一次提交(并积极运行)最多 20 个任务。

具体来说,

from time import sleep
from random import random

def inc(x):
    from random import random
    sleep(random() * 2)
    return x + 1

def double(x):
    from random import random
    sleep(random())
    return 2 * x

>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>

如果我设置一个队列系统

>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)

这会起作用,但是,这只会将我的所有任务转储到网格中,使其饱和。理想情况下,我可以:

e.scatter(input_q, max_submit=20)

似乎这里文档中的示例允许我使用maxsize队列。但从用户的角度来看,我仍然需要处理背压。理想情况下 dask会自动处理这个问题。

4

2 回答 2

8

利用maxsize=

你很亲密。所有scatter,gather和都map采用相同的maxsize=关键字参数Queue。所以一个简单的工作流程可能如下:

例子

from time import sleep

def inc(x):
    sleep(1)
    return x + 1

your_input_data = list(range(1000))

from queue import Queue              # Put your data into a queue
q = Queue()
for i in your_input_data:
    q.put(i)

from dask.distributed import Executor
e = Executor('127.0.0.1:8786')        # Connect to cluster


futures = e.map(inc, q, maxsize=20)  # Map inc over data
results = e.gather(futures)          # Gather results

L = []
while not q.empty() or not futures.empty() or not results.empty():
    L.append(results.get())  # this blocks waiting for all results

所有qfuturesresults都是 Python Queue 对象。和队列没有限制,所以他们会尽可能多地贪婪地拉进来qresults然而,futures队列的最大大小为 20,因此在任何给定时间它只会允许 20 个期货在运行。一旦领先的未来完成,它将立即被收集函数消耗,其结果将被放入results队列中。这释放了空间futures并导致提交另一个任务。

请注意,这并不是您想要的。这些队列是有序的,因此期货只会在它们位于队列的前面时才会弹出。如果除第一个之外的所有飞行中的期货都已完成,它们仍将留在队列中,占用空间。鉴于此限制,您可能希望选择maxsize=比您想要的项目稍多的20项目。

扩展这个

在这里,我们做了一个简单的map->gather管道,中间没有逻辑。您还可以将其他map计算放在此处,甚至将期货从队列中拉出并自行使用它们进行自定义工作。很容易打破上面提供的模具。

于 2016-08-12T15:40:15.760 回答
1

发布在 github 上的解决方案非常有用 - https://github.com/dask/distributed/issues/864

解决方案:

inputs = iter(inputs)
futures = [c.submit(func, next(inputs)) for i in range(maxsize)]
ac = as_completed(futures)

for finished_future in ac:
    # submit new future 
    try:
        new_future = c.submit(func, next(inputs))
        ac.append(new_future)
    except StopIteration:
        pass
    result = finished_future.result() 
    ... # do stuff with result

询问:

但是,为了确定可以自由限制任务的工作人员,我正在尝试使用 client.has_what() api。似乎工人的负载并没有像状态 UI 页面上显示的那样立即得到反映。有时,has_what 反映任何数据需要相当长的时间。

是否有另一个 api 可用于确定空闲工作人员的数量,然后可用于确定类似于 UI 正在使用的油门范围。

于 2018-03-27T22:12:48.673 回答