dask.compute(...) 应该是一个阻塞调用。但是,当我嵌套了 dask.compute,而内部的执行 I/O(如 dask.dataframe.read_parquet)时,内部的 dask.compute 不会阻塞。这是一个伪代码示例:
import dask, distributed
def outer_func(name):
files = find_files_for_name(name)
df = inner_func(files).compute()
# do work with df
return result
def inner_func(files):
tasks = [ dask.dataframe.read_parquet(f) for f in files ]
tasks = dask.dataframe.concat(tasks)
return tasks
client = distributed.Client(scheduler_file=...)
results = dask.compute([ dask.delay(outer_func)(name) for name in names ])
如果我启动 2 个工作人员,每个工作人员有 8 个进程,例如:
dask-worker --scheduler-file $sched_file --nprocs 8 --nthreads 1
,那么我预计最多 2 x 8 个并发的 inner_func 正在运行,因为 inner_func(files).compute() 应该是阻塞的。然而,我观察到的是,在一个工作进程中,一旦它开始 read_parquet 步骤,可能会有另一个 inner_func(files).compute() 开始运行。所以最后可能会有多个 inner_func(files).compute() 运行,有时它可能会导致内存不足错误。
这是预期的行为吗?如果是这样,有什么方法可以强制每个工作进程执行一个 inner_func(files).compute() 吗?