我想我遗漏了一些东西(仍然是 Dask Noob),但我正在尝试批处理建议以避免从这里执行过多的 Dask 任务:
https://docs.dask.org/en/latest/delayed-best-practices.html
并且不能让它们工作。这是我尝试过的:
import dask
def f(x):
return x*x
def batch(seq):
sub_results = []
for x in seq:
sub_results.append(f(x))
return sub_results
batches = []
for i in range(0, 1000000000, 1000000):
result_batch = dask.delayed(batch, range(i, i + 1000000))
batches.append(result_batch)
批次现在包含延迟对象:
batches[:3]
[Delayed(range(0, 1000000)),
Delayed(range(1000000, 2000000)),
Delayed(range(2000000, 3000000))]
但是当我计算它们时,我得到批处理函数指针(我认为??):
results = dask.compute(*batches)
results[:3]
(<function __main__.batch(seq)>,
<function __main__.batch(seq)>,
<function __main__.batch(seq)>)
我有两个问题:
这真的应该如何运行,因为它似乎与页面的第一行相矛盾,因为它会立即
Best practices
运行而不是懒惰。delayed(f(x))
如何获得上述批量运行的结果?