背景
我正在使用 dask 管理数以万计、有时甚至是数十万个作业,每个作业都涉及读取 zarr 数据、以某种方式转换数据以及写出输出(每个作业一个输出)。我正在使用带有 dask 网关的pangeo/daskhub风格的 jupyterhub kubernetes 集群。
数据不能对齐到一个巨大的 dask 支持的 xarray DataArray,相反,我们常见的使用模式是简单地使用 dask 分布式Client.map
将函数映射到任务。每个任务都可以在内存中运行。
问题在于,对于包括读取 zarr 数组在内的某些操作,xarray 会自动使用集群调度程序调度每个 I/O 操作,即使该操作是由远程工作人员调用的。这使调度程序必须管理的任务数量成倍增加,有时是一个很大的倍数。如果 zarr 数组有很多块,有时 dask 调度程序会尝试分配读取,从而导致集群内出现大量网络流量,当许多任务尝试排队以读取其块时,这可能会使进度停止同时。
完全有可能这里的正确答案是“不要这样做”,例如,对于如此庞大而复杂的工作,请使用 Argo 或 Kubeflow 之类的东西。但我想看看是否有人对如何使这个工作与 dask 一起工作有想法。
问题
我的问题本质上是在 dask.distributed 任务中运行时是否可以阻止 xarray(或具有本机 dask 支持的另一个库)使用集群的调度程序。
我认为我所看到的理想可能是这样的:
def mappable_task(args):
input_fp, output_fp = args
# goal would be for all of the code within this block to operate as if the
# cluster scheduler did not exist, to ensure data is read locally and the
# additional tasks created by reading the zarr array do not bog down the
# cluster scheduler.
with dd.disable_scheduler():
# in our workflow we're reading/writing to google cloud storage using
# gcsfs.GCSFileSystem.get_mapper
# https://gcsfs.readthedocs.io/en/latest/api.html
# Also, we sometimes will only be reading a small portion of the
# data, or are combining multiple datasets. Just noting that this
# may involve many scheduled operations per `mappable_task` call
ds = xr.open_zarr(input_fp).load()
# some long-running operation on the data, which depending on our
# use case has run from nonlinear transforms to geospatial ops to
# calling hydrodynamics models
res = ds * 2
res.to_zarr(output_fp)
def main():
JOB_SIZE = 10_000
jobs = [(f'/infiles/{i}.zarr', f'/outfiles/{i}.zarr') for i in range(JOB_SIZE)]
client = dd.Client()
futures = client.map(mappable_task, jobs)
dd.wait(futures)
我不确定这是否会涉及更改 xarray、zarr 或 dd.get_client() 或其他东西的行为。
MRE
可以调整上述内容以获得可测试的示例。目标是看不到除主映射功能之外的任何任务。我在 jupyterlab ipython notebook 中运行了以下命令,并使用 dask-labextension 观看了任务(调度程序仪表板显示相同的结果)
进口
import xarray as xr
import dask.distributed as dd
import numpy as np
import os
import datetime
import shutil
测试文件设置
shutil.rmtree('infiles', ignore_errors=True)
shutil.rmtree('outfiles', ignore_errors=True)
os.makedirs('infiles', exist_ok=True)
os.makedirs('outfiles', exist_ok=True)
# create two Zarr stores, each with 1000 chunks. This isn't an uncommon
# structure, though each chunk would normally have far more data
for i in range(2):
ds = xr.Dataset(
{'var1': (('dim1', 'dim2', ), np.random.random(size=(1000, 100)))},
coords={'dim1': np.arange(1000), 'dim2': np.arange(100)},
).chunk({'dim1': 1})
ds.to_zarr(f'infiles/data_{i}.zarr')
函数定义
def mappable_task(args):
input_fp, output_fp = args
# in our workflow we're reading/writing to google cloud storage using
# gcsfs.GCSFileSystem.get_mapper
# https://gcsfs.readthedocs.io/en/latest/api.html
ds = xr.open_zarr(input_fp).load()
# some long-running operation on the data, which depending on our
# use case has run from nonlinear transforms to geospatial ops to
# calling hydrodynamics models
res = ds * 2
res.to_zarr(output_fp)
创建客户端并查看仪表板
client = dd.Client()
client
映射作业
JOB_SIZE = 2
jobs = [(f'infiles/data_{i}.zarr', f'outfiles/out_{i}.zarr') for i in range(JOB_SIZE)]
futures = client.map(mappable_task, jobs)
dd.wait(futures);
清理(如果再次运行)
shutil.rmtree('outfiles', ignore_errors=True)
os.makedirs('outfiles', exist_ok=True)
# refresh the client (in case of running multiple times)
client.restart()
全面清理
shutil.rmtree('infiles', ignore_errors=True)
shutil.rmtree('outfiles', ignore_errors=True)
client.close();
请注意,尽管只有两个作业,但仍安排了数千个任务。
我正在使用具有以下(在许多其他软件包中)的 conda 环境:
dask 2.30.0 py_0 conda-forge
dask-gateway 0.9.0 py38h578d9bd_0 conda-forge
dask-labextension 3.0.0 py_0 conda-forge
jupyter-server-proxy 1.5.0 py_0 conda-forge
jupyter_client 6.1.7 py_0 conda-forge
jupyter_core 4.7.0 py38h578d9bd_0 conda-forge
jupyter_server 1.1.3 py38h578d9bd_0 conda-forge
jupyter_telemetry 0.1.0 pyhd8ed1ab_1 conda-forge
jupyterhub 1.2.2 py38h578d9bd_0 conda-forge
jupyterhub-base 1.2.2 py38h578d9bd_0 conda-forge
jupyterlab 2.2.9 py_0 conda-forge
jupyterlab_server 1.2.0 py_0 conda-forge
jupyterlab_widgets 1.0.0 pyhd8ed1ab_1 conda-forge
kubernetes 1.18.8 0 conda-forge
kubernetes-client 1.18.8 haa36a5b_0 conda-forge
kubernetes-node 1.18.8 haa36a5b_0 conda-forge
kubernetes-server 1.18.8 haa36a5b_0 conda-forge
nb_conda_kernels 2.3.1 py38h578d9bd_0 conda-forge
nbclient 0.5.1 py_0 conda-forge
nodejs 15.2.1 h914e61d_0 conda-forge
notebook 6.1.6 py38h578d9bd_0 conda-forge
numpy 1.19.4 py38hf0fd68c_1 conda-forge
pandas 1.1.5 py38h51da96c_0 conda-forge
python 3.8.6 h852b56e_0_cpython conda-forge
python-kubernetes 11.0.0 py38h32f6830_0 conda-forge
xarray 0.16.2 pyhd8ed1ab_0 conda-forge
zarr 2.6.1 pyhd8ed1ab_0 conda-forge