2

背景

我正在使用 dask 管理数以万计、有时甚至是数十万个作业,每个作业都涉及读取 zarr 数据、以某种方式转换数据以及写出输出(每个作业一个输出)。我正在使用带有 dask 网关的pangeo/daskhub风格的 jupyterhub kubernetes 集群。

数据不能对齐到一个巨大的 dask 支持的 xarray DataArray,相反,我们常见的使用模式是简单地使用 dask 分布式Client.map将函数映射到任务。每个任务都可以在内存中运行。

问题在于,对于包括读取 zarr 数组在内的某些操作,xarray 会自动使用集群调度程序调度每个 I/O 操作,即使该操作是由远程工作人员调用的。这使调度程序必须管理的任务数量成倍增加,有时是一个很大的倍数。如果 zarr 数组有很多块,有时 dask 调度程序会尝试分配读取,从而导致集群内出现大量网络流量,当许多任务尝试排队以读取其块时,这可能会使进度停止同时。

完全有可能这里的正确答案是“不要这样做”,例如,对于如此庞大而复杂的工作,请使用 Argo 或 Kubeflow 之类的东西。但我想看看是否有人对如何使这个工作与 dask 一起工作有想法。

问题

我的问题本质上是在 dask.distributed 任务中运行时是否可以阻止 xarray(或具有本机 dask 支持的另一个库)使用集群的调度程序。

我认为我所看到的理想可能是这样的:

def mappable_task(args):
    input_fp, output_fp = args

    # goal would be for all of the code within this block to operate as if the
    # cluster scheduler did not exist, to ensure data is read locally and the
    # additional tasks created by reading the zarr array do not bog down the 
    # cluster scheduler.
    with dd.disable_scheduler():
        
        # in our workflow we're reading/writing to google cloud storage using
        # gcsfs.GCSFileSystem.get_mapper
        # https://gcsfs.readthedocs.io/en/latest/api.html
        # Also, we sometimes will only be reading a small portion of the
        # data, or are combining multiple datasets. Just noting that this
        # may involve many scheduled operations per `mappable_task` call
        ds = xr.open_zarr(input_fp).load()
        
        # some long-running operation on the data, which depending on our
        # use case has run from nonlinear transforms to geospatial ops to
        # calling hydrodynamics models
        res = ds * 2

        res.to_zarr(output_fp)

def main():
    JOB_SIZE = 10_000
    jobs = [(f'/infiles/{i}.zarr', f'/outfiles/{i}.zarr') for i in range(JOB_SIZE)]
    client = dd.Client()
    futures = client.map(mappable_task, jobs)
    dd.wait(futures)

我不确定这是否会涉及更改 xarray、zarr 或 dd.get_client() 或其他东西的行为。

MRE

可以调整上述内容以获得可测试的示例。目标是看不到除主映射功能之外的任何任务。我在 jupyterlab ipython notebook 中运行了以下命令,并使用 dask-labextension 观看了任务(调度程序仪表板显示相同的结果)

进口

import xarray as xr
import dask.distributed as dd
import numpy as np
import os
import datetime
import shutil

测试文件设置

shutil.rmtree('infiles', ignore_errors=True)
shutil.rmtree('outfiles', ignore_errors=True)

os.makedirs('infiles', exist_ok=True)
os.makedirs('outfiles', exist_ok=True)

# create two Zarr stores, each with 1000 chunks. This isn't an uncommon
# structure, though each chunk would normally have far more data
for i in range(2):
    ds = xr.Dataset(
        {'var1': (('dim1', 'dim2', ), np.random.random(size=(1000, 100)))},
        coords={'dim1': np.arange(1000), 'dim2': np.arange(100)},
    ).chunk({'dim1': 1})
    
    ds.to_zarr(f'infiles/data_{i}.zarr')

函数定义

def mappable_task(args):
    input_fp, output_fp = args
        
    # in our workflow we're reading/writing to google cloud storage using
    # gcsfs.GCSFileSystem.get_mapper
    # https://gcsfs.readthedocs.io/en/latest/api.html
    ds = xr.open_zarr(input_fp).load()

    # some long-running operation on the data, which depending on our
    # use case has run from nonlinear transforms to geospatial ops to
    # calling hydrodynamics models
    res = ds * 2

    res.to_zarr(output_fp)

创建客户端并查看仪表板

client = dd.Client()
client

映射作业

JOB_SIZE = 2
jobs = [(f'infiles/data_{i}.zarr', f'outfiles/out_{i}.zarr') for i in range(JOB_SIZE)]

futures = client.map(mappable_task, jobs)
dd.wait(futures);

清理(​​如果再次运行)

shutil.rmtree('outfiles', ignore_errors=True)
os.makedirs('outfiles', exist_ok=True)

# refresh the client (in case of running multiple times)
client.restart()

全面清理

shutil.rmtree('infiles', ignore_errors=True)
shutil.rmtree('outfiles', ignore_errors=True)
client.close();

请注意,尽管只有两个作业,但仍安排了数千个任务。

我正在使用具有以下(在许多其他软件包中)的 conda 环境:

dask                      2.30.0                     py_0    conda-forge
dask-gateway              0.9.0            py38h578d9bd_0    conda-forge
dask-labextension         3.0.0                      py_0    conda-forge
jupyter-server-proxy      1.5.0                      py_0    conda-forge
jupyter_client            6.1.7                      py_0    conda-forge
jupyter_core              4.7.0            py38h578d9bd_0    conda-forge
jupyter_server            1.1.3            py38h578d9bd_0    conda-forge
jupyter_telemetry         0.1.0              pyhd8ed1ab_1    conda-forge
jupyterhub                1.2.2            py38h578d9bd_0    conda-forge
jupyterhub-base           1.2.2            py38h578d9bd_0    conda-forge
jupyterlab                2.2.9                      py_0    conda-forge
jupyterlab_server         1.2.0                      py_0    conda-forge
jupyterlab_widgets        1.0.0              pyhd8ed1ab_1    conda-forge
kubernetes                1.18.8                        0    conda-forge
kubernetes-client         1.18.8               haa36a5b_0    conda-forge
kubernetes-node           1.18.8               haa36a5b_0    conda-forge
kubernetes-server         1.18.8               haa36a5b_0    conda-forge
nb_conda_kernels          2.3.1            py38h578d9bd_0    conda-forge
nbclient                  0.5.1                      py_0    conda-forge
nodejs                    15.2.1               h914e61d_0    conda-forge
notebook                  6.1.6            py38h578d9bd_0    conda-forge
numpy                     1.19.4           py38hf0fd68c_1    conda-forge
pandas                    1.1.5            py38h51da96c_0    conda-forge
python                    3.8.6           h852b56e_0_cpython    conda-forge
python-kubernetes         11.0.0           py38h32f6830_0    conda-forge
xarray                    0.16.2             pyhd8ed1ab_0    conda-forge
zarr                      2.6.1              pyhd8ed1ab_0    conda-forge
4

0 回答 0