我一直在使用以下代码构建一个 GRIB 文件读取工具。我有异构文件,所以我不能使用 xarray.open_mfdatasets 或类似的东西。
def create_open_delays(file_list: List[str],expected_dataset_count:int) -> List[Delayed]:
"""
This function runs through list of files and creates a
list of delayed open commands.
"""
return [
dask.delayed(cfgrib.open_datasets,nout=expected_dataset_count)(file,
backend_kwargs={
"indexpath": ""
},
cache=True) for file in file_list
]
在运行代码时,我注意到与完全并行处理相比,以线程并行运行的性能降低了 10 倍(每个 Dask 工作人员只有 1 个线程)。我猜这与 GIL 有关,我猜任何人都不会感到惊讶。DASK 文档确实强调这是一个优化机会。拥有这么多工人有一些缺点,因为他们现在的内存有限,并且启动所有工人需要额外的开销,更不用说更多的进程通信了。每个任务大约需要 10 秒,所以我不关心 Dask.delayed 的开销。
我有两个问题:
- 下层 CFGrib/Eccodes 包中有什么可以提高多线程性能的吗?根据我的模糊理解,numpy 等在底层编译代码中采取步骤来释放 GIL?
- 是否可以利用 DASK 中新的 asyncIO python 功能?(我并没有要求任何人立即开发它,我只是想知道这样的东西是否存在或正在开发中,是否是一个愚蠢的想法)
谢谢。