1

我正在将数据从 netcdf 传输和重新分块到 zarr。该过程很慢,并且没有使用太多的 CPU。我尝试了几种不同的配置,有时它似乎做得稍微好一点,但效果不佳。有没有人有任何提示可以更有效地运行?

最后一次尝试(以及之前的一些尝试,可能是所有尝试)(使用单机、分布式调度程序和使用线程)日志给出了以下消息:

Distributed.core - 信息 - 事件循环在 Worker 中无响应 10.05 秒。这通常是由长时间运行的 GIL 持有函数或移动大块数据引起的。

CPU 利用率。 它开始时很低,然后下降得更低。

以前我遇到过内存用完的错误,所以我正在使用下面的“stepwise_to_zarr”函数分段编写 zarr:

def stepwise_to_zarr(dataset, step_dim, step_size, chunks, out_loc, group):
    start = dataset[step_dim].min()
    end = dataset[step_dim].max()
    iis = np.arange(start, end, step_size)
    if end > iis[-1]:
          iis = np.append(iis, end)
            
    lon=dataset.get_index(step_dim)
    
    first = True
    failures = []
    for i in range(1,len(iis)):
        lower, upper = (iis[i-1], iis[i])
        if upper >= end:
            lon_list= [l for l in lon if lower <= l <= upper]

        else:
            lon_list= [l for l in lon if lower <= l < upper]
        sub = dataset.sel(longitude=lon_list)

        rechunked_sub = sub.chunk(chunks)
        write_sync=zarr.ThreadSynchronizer()

        if first:
            rechunked_sub.to_zarr(out_loc, group=group, 
                consolidated=True, synchronizer=write_sync, mode="w")
            first = False
        else:
            rechunked_sub.to_zarr(out_loc, group=group, 
                consolidated=True, synchronizer=write_sync, append_dim=step_dim)


chunks = {'time':8760, 'latitude':21,  'longitude':20}
ds = xr.open_mfdataset("path to data", parallel=True, combine="by_coords")
stepwise_to_zarr(ds, step_size=10, step_dim="longitude", 
    chunks=chunks, out_loc="path to output", group="group name")

在上图中,利用率从约 6% 下降到约 0.5% 似乎与完成 10 度纬度的第一批“批次”相吻合。

背景资料:

  • 我正在使用 32 个 vCPU 和 256 GB 内存的单个 GCE 实例。
  • 数据大约 600 GB,分布在大约 150 个 netcdf 文件中。
  • 数据在 GCS 中,我正在使用 Cloud Storage FUSE 读取和写入数据。
  • 我将数据从块大小重新分块:{'time':1,'latitude':521,'longitude':1440} 到块大小:{'time':8760,'latitude':21,'longitude':20}

我努力了:

  • 使用默认的多处理调度程序
  • 对单机(https://docs.dask.org/en/latest/setup/single-distributed.html)使用分布式调度程序,processs=True 和 processes=False。
  • 分布式调度程序和默认的多处理调度程序,同时还设置环境变量以避免过度订阅线程,如下所示:
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1

如最佳实践中所述(https://docs.dask.org/en/latest/array-best-practices.html?highlight=export#avoid-oversubscribing-threads

4

1 回答 1

0

我最终通过写入带有块的中间 Zarr 存储解决了我的问题:{'time':8760, 'latitude':260, 'longitude':360}。这进展很快,尽管 cpu 资源仅在相对较小的部分工作中得到充分利用。然后,我使用问题中描述的逐步过程的修改版本读取了这个中间 zarr 并存储在最终的分块中。这给出了可接受的性能,尽管并不理想。

写入中间存储时的 CPU 利用率

从中间存储写入最终存储时的 CPU 利用率

这是代码:

def stepwise_to_zarr(dataset, step_dim, step_size, encoding, out_loc, group, include_end=True):
    start = dataset[step_dim].min()
    end = dataset[step_dim].max()
    iis = np.arange(start, end, step_size)
    if end > iis[-1]:
          iis = np.append(iis, end)

    lon=dataset.get_index(step_dim)
    first = True
    failures = []
    for i in range(1,len(iis)):
        lower, upper = (iis[i-1], iis[i])
        if upper >= end and include_end:
            lon_list= [l for l in lon if lower <= l <= upper]
        else:
            lon_list= [l for l in lon if lower <= l < upper]
    
        sub = dataset.sel(longitude=lon_list)
        write_sync=zarr.ThreadSynchronizer()
        if first:
            sub_write=sub.to_zarr(output_loc,
                           group=varname,
                           consolidated=True,
                           synchronizer=write_sync,
                           encoding=encoding,
                           mode="w", compute=False)
            first = False
        else:
            sub_write=sub.to_zarr(output_loc,
                   group=varname,
                   consolidated=True,
                   synchronizer=write_sync,
                   append_dim=step_dim,
                   compute=False)
        sub_write.compute(retries=2)

z = xr.open_zarr(input_loc, group=groupname)
new_chunks = {'time':8760, 'latitude':21,  'longitude':20}
z_rechunked=z.chunk(new_chunks)

#Workaround to avoid:NotImplementedError: Specified zarr chunks (8760, 260, 360) would #overlap multiple dask chunks
#See https://github.com/pydata/xarray/issues/2300
encoding = {}
for v in ['var1', 'var2', 'var3']:
    encoding.update({v:z[v].encoding.copy()})
    encoding[v]["chunks"]=(96408, 21, 20)

stepwise_to_zarr(z_rechunked, "longitude", 60, encoding, output_loc, group=groupname)

注意我必须覆盖编码才能重新分块 zarrs。

这个过程有效,但有点麻烦。我之所以这样做,是因为我没有听说过 rechunker。下次我重新分块时,我会尝试重新分块来解决这个问题。

于 2020-09-16T18:31:58.513 回答