我正在将数据从 netcdf 传输和重新分块到 zarr。该过程很慢,并且没有使用太多的 CPU。我尝试了几种不同的配置,有时它似乎做得稍微好一点,但效果不佳。有没有人有任何提示可以更有效地运行?
最后一次尝试(以及之前的一些尝试,可能是所有尝试)(使用单机、分布式调度程序和使用线程)日志给出了以下消息:
Distributed.core - 信息 - 事件循环在 Worker 中无响应 10.05 秒。这通常是由长时间运行的 GIL 持有函数或移动大块数据引起的。
以前我遇到过内存用完的错误,所以我正在使用下面的“stepwise_to_zarr”函数分段编写 zarr:
def stepwise_to_zarr(dataset, step_dim, step_size, chunks, out_loc, group):
start = dataset[step_dim].min()
end = dataset[step_dim].max()
iis = np.arange(start, end, step_size)
if end > iis[-1]:
iis = np.append(iis, end)
lon=dataset.get_index(step_dim)
first = True
failures = []
for i in range(1,len(iis)):
lower, upper = (iis[i-1], iis[i])
if upper >= end:
lon_list= [l for l in lon if lower <= l <= upper]
else:
lon_list= [l for l in lon if lower <= l < upper]
sub = dataset.sel(longitude=lon_list)
rechunked_sub = sub.chunk(chunks)
write_sync=zarr.ThreadSynchronizer()
if first:
rechunked_sub.to_zarr(out_loc, group=group,
consolidated=True, synchronizer=write_sync, mode="w")
first = False
else:
rechunked_sub.to_zarr(out_loc, group=group,
consolidated=True, synchronizer=write_sync, append_dim=step_dim)
chunks = {'time':8760, 'latitude':21, 'longitude':20}
ds = xr.open_mfdataset("path to data", parallel=True, combine="by_coords")
stepwise_to_zarr(ds, step_size=10, step_dim="longitude",
chunks=chunks, out_loc="path to output", group="group name")
在上图中,利用率从约 6% 下降到约 0.5% 似乎与完成 10 度纬度的第一批“批次”相吻合。
背景资料:
- 我正在使用 32 个 vCPU 和 256 GB 内存的单个 GCE 实例。
- 数据大约 600 GB,分布在大约 150 个 netcdf 文件中。
- 数据在 GCS 中,我正在使用 Cloud Storage FUSE 读取和写入数据。
- 我将数据从块大小重新分块:{'time':1,'latitude':521,'longitude':1440} 到块大小:{'time':8760,'latitude':21,'longitude':20}
我努力了:
- 使用默认的多处理调度程序
- 对单机(https://docs.dask.org/en/latest/setup/single-distributed.html)使用分布式调度程序,processs=True 和 processes=False。
- 分布式调度程序和默认的多处理调度程序,同时还设置环境变量以避免过度订阅线程,如下所示:
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1