1

我一直在使用以下代码构建一个 GRIB 文件读取工具。我有异构文件,所以我不能使用 xarray.open_mfdatasets 或类似的东西。

def create_open_delays(file_list: List[str],expected_dataset_count:int) -> List[Delayed]:
    """
    This function runs through list of files and creates a
    list of delayed open commands.
    """
    return [
        dask.delayed(cfgrib.open_datasets,nout=expected_dataset_count)(file,
                                           backend_kwargs={
                                               "indexpath": ""
                                           },
                                           cache=True) for file in file_list
    ]

在运行代码时,我注意到与完全并行处理相比,以线程并行运行的性能降低了 10 倍(每个 Dask 工作人员只有 1 个线程)。我猜这与 GIL 有关,我猜任何人都不会感到惊讶。DASK 文档确实强调这是一个优化机会。拥有这么多工人有一些缺点,因为他们现在的内存有限,并且启动所有工人需要额外的开销,更不用说更多的进程通信了。每个任务大约需要 10 秒,所以我不关心 Dask.delayed 的开销。

我有两个问题:

  1. 下层 CFGrib/Eccodes 包中有什么可以提高多线程性能的吗?根据我的模糊理解,numpy 等在底层编译代码中采取步骤来释放 GIL?
  2. 是否可以利用 DASK 中新的 asyncIO python 功能?(我并没有要求任何人立即开发它,我只是想知道这样的东西是否存在或正在开发中,是否是一个愚蠢的想法)

谢谢。

4

1 回答 1

0

如果您还没有看过它,我建议您看看 Xarray,至少看看他们如何处理 Grib。

于 2020-11-03T20:16:31.613 回答