0

我想我遗漏了一些东西(仍然是 Dask Noob),但我正在尝试批处理建议以避免从这里执行过多的 Dask 任务:

https://docs.dask.org/en/latest/delayed-best-practices.html

并且不能让它们工作。这是我尝试过的:

import dask

def f(x):
    return x*x

def batch(seq):
    sub_results = []
    for x in seq:
        sub_results.append(f(x))
    return sub_results

batches = []
for i in range(0, 1000000000, 1000000):
 result_batch = dask.delayed(batch, range(i, i + 1000000))
 batches.append(result_batch)

批次现在包含延迟对象:

batches[:3]

[Delayed(range(0, 1000000)),
 Delayed(range(1000000, 2000000)),
 Delayed(range(2000000, 3000000))]

但是当我计算它们时,我得到批处理函数指针(我认为??):

results = dask.compute(*batches)
results[:3]

(<function __main__.batch(seq)>,
 <function __main__.batch(seq)>,
 <function __main__.batch(seq)>)

我有两个问题:

  1. 这真的应该如何运行,因为它似乎与页面的第一行相矛盾,因为它立即Best practices运行而不是懒惰。delayed(f(x))

  2. 如何获得上述批量运行的结果?

4

1 回答 1

1

看起来您的代码缺少一对括号。不确定这是否是一个错字(???)。

根据文档中的示例,我认为您想要

result_batch = dask.delayed(batch)(range(i, i + 1000000))

我替换batch, ran...batch)(ran...,因为对batch()函数的调用应该被延迟。

答案

  1. 修正错字后,您的代码对我来说可以正常工作 - 现在计算将被延迟。关于文档开头所写的内容 - 使用dask.delayed. 对dask.delayed( batch(range(i, i + 1000000)) )函数的调用batch(...)不会被延迟,因此它会立即运行。这是因为函数的输出已经被包裹在dask.delayed中,所以输出(结果)会被延迟,这不是我们想要的工作流程。但是,dask.delayed(batch)(range(i, i + 1000000))延迟了对函数的调用(因为在这里,dask.delayed包装了函数本身)。我相信这就是文档在最佳实践部分开始时想要说的。
  2. 同样,修正错字后,您的代码将按预期运行,并将冗长的输出打印到屏幕上。
于 2020-12-04T23:10:22.800 回答