0

dask.compute(...) 应该是一个阻塞调用。但是,当我嵌套了 dask.compute,而内部的执行 I/O(如 dask.dataframe.read_parquet)时,内部的 dask.compute 不会阻塞。这是一个伪代码示例:

import dask, distributed

def outer_func(name):
    files = find_files_for_name(name)
    df = inner_func(files).compute()
    # do work with df
    return result

def inner_func(files):
    tasks = [ dask.dataframe.read_parquet(f) for f in files ]
    tasks = dask.dataframe.concat(tasks)
    return tasks

client = distributed.Client(scheduler_file=...)
results = dask.compute([ dask.delay(outer_func)(name) for name in names ])

如果我启动 2 个工作人员,每个工作人员有 8 个进程,例如:

dask-worker --scheduler-file $sched_file --nprocs 8 --nthreads 1

,那么我预计最多 2 x 8 个并发的 inner_func 正在运行,因为 inner_func(files).compute() 应该是阻塞的。然而,我观察到的是,在一个工作进程中,一旦它开始 read_parquet 步骤,可能会有另一个 inner_func(files).compute() 开始运行。所以最后可能会有多个 inner_func(files).compute() 运行,有时它可能会导致内存不足错误。

这是预期的行为吗?如果是这样,有什么方法可以强制每个工作进程执行一个 inner_func(files).compute() 吗?

4

2 回答 2

1

当您要求 dask 分布式调度程序运行工作时,它会将函数的代码和所需的任何数据发送到不同进程中的工作函数,可能在不同的机器上。这些工作进程忠实地执行函数,并像正常的 python 代码一样运行。关键是,正在运行的函数不知道它在 dask worker 上 - 默认情况下,它会看到没有设置全局 dask 分布式客户端,并执行 dask 通常在这种情况下会做的事情:执行任何 dask默认调度程序(线程调度程序)上的工作负载。

如果您确实必须在任务中执行完整的 dask-compute 操作,并希望这些操作使用运行这些任务的分布式调度程序,您将需要使用worker 客户端。但是,我觉得在您的情况下,改写作业以删除嵌套(类似于上面的伪代码,尽管这也可以与计算一起使用)可能是更简单的方法。

于 2017-08-17T17:54:23.423 回答
0

多进程调度程序似乎不是这种情况。

为了使用分布式调度程序,我通过distributed.Client API 使用有节奏的作业提交而不是依赖dask.compute 找到了解决方法。dask.compute 适用于简单的用例,但显然不知道可以安排多少未完成的任务,因此在这种情况下会超出系统。

这是运行 dask.Delayed 任务集合的伪代码:

import distributed as distr

def paced_compute(tasks, batch_size, client):
    """
    Run delayed tasks, maintaining at most batch_size running at any
    time. After the first batch is submitted,
    submit a new job only after an existing one is finished, 
    continue until all tasks are computed and finished.

    tasks: collection of dask.Delayed
    client: distributed.Client obj
    """
    results, tasks = [], list(tasks)
    working_futs = client.compute(tasks[:batch_size])
    tasks = tasks[batch_size:]
    ac = distr.as_completed(working_futs)
    for fut in ac:
        res = fut.result()
        results.append(res)
        if tasks:
            job = tasks.pop()
            ac.add(client.compute(job))
    return results
于 2017-08-10T19:49:46.857 回答