1

TLDR:

如何有效地使用dask-distributed将多个支持的数据集写入 AWS S3 上的dask存储?xarrayzarr

详情

我有一个工作流,它采用 S3 上的栅格数据集列表并生成一个 dask-array 支持的 xarray 数据集。

我需要遍历多个组,对于每个组,工作流都会获取属于该组的栅格数据集并生成相应的 xarray 数据集。

现在我想将数据集中的数据写入 S3 上的 zarr 存储(同一个存储,仅使用group参数)。

顺序处理的伪代码如下所示:

client = Client(...) # using a distributed cluster

zarr_store = fsspec.get_mapper("s3://bucket/key.zarr")

for group_select in groups:
    
    xr_dataset = get_dataset_for_group(group_select)
    
    # totally unnecessary, just to illustrate that this is a lazy dataset, nothing has been loaded yet
    assert dask.is_dask_collection(xr_dataset)
    
    xr_dataset.to_zarr(zarr_store, group=group_select)

这很好用,一旦to_zarr执行,数据就会被加载并存储在 S3 上,任务并行运行。


现在我想使用dask.distribuited. 这是我尝试过的以及遇到的问题:

1.用于.to_zarr(..., compute=False)收集延迟任务列表

这原则上有效,但速度很慢。创建一个任务大约需要 3-4 秒,我需要运行 100 多次,在实际开始任何计算之前需要 4-5 分钟。

2.把它包起来dask.delayed

这极大地加快了任务的创建速度,但是写入 zarr 存储并没有在工作人员之间分配,而是处理任务的工作人员在加载任务完成后收集所有数据并将其写入 zarr。

3.包装to_zarr自定义函数并将其传递给client.submit

这看起来是最有希望的选择。我刚刚将to_zarr调用包装在一个自定义函数中,可以从工作人员调用:

def dump(ds, target, group=None):
    with worker_client() as client:
        ds.to_zarr(store=target, group=group)  
    return True

这样做worker_client会将编写任务放回调度程序并解决我在上面遇到的问题dask.delayed

但是,当我按照以下方式重复提交此功能时(我需要这样做 100 多次)

futures = [client.submit(dump, x, target, g) for x,g in zip(datasets, groups)]

我很快用要处理的任务使调度程序不堪重负。

我能想到的唯一明显的解决方案是分批拆分数据集,只有在前一个完成后才开始一个新的。但是没有更优雅的解决方案吗?或者dask(分布式)中是否有内置功能?

4

1 回答 1

1

在我的经验/环境中,调度程序很容易被太多任务(以及太多无法协调的工作人员)压倒,因此将事情分成批次通常是可行的。

要创建移动的工作队列,您可以使用as_completed, 每次完成另一个任务时提交/添加任务。请参阅以下相关答案:12

于 2021-03-23T19:47:39.750 回答