0

我正在尝试将迭代器通过(非标准)类似文件的对象传递给dask.delayed函数。当我尝试时compute(),我从 dask 收到以下消息,以及下面的回溯。

distributed.protocol.pickle - INFO - Failed to serialize 
  ([<items>, ... ], OrderedDict(..)).
Exception: self.ptr cannot be converted to a Python object for pickling

Traceback (most recent call last):
  File "/home/user/miniconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 38, in dumps
    result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "stringsource", line 2, in pysam.libcbcf.VariantRecord.__reduce_cython__
TypeError: self.ptr cannot be converted to a Python object for pickling

源代码的对应部分如下所示:

delayed(to_arrow)(vf.fetch(..), ordered_dict)

vf是类文件对象,并vf.fetch(..)返回文件中存在的记录的迭代器(这是一个VCF 文件,我正在使用pysam库来读取它)。我希望这提供了足够的背景。

来自的消息dask显示迭代发生在函数调用期间而不是函数内部,这让我相信传递迭代器可能是不合适的。所以我做了一个快速检查sum(range(..)),这似乎有效。现在我很困惑,我错过了什么?

为此提供一个最小的工作示例有点困难。但也许以下内容会有所帮助。

  1. 从这里下载一个 VCF 文件(和它的索引):说,ALL.chrY*vcf.gz{,.tbi}
  2. pip3 install --user pysam
  3. 打开文件:vf = VariantFile('/path/to/file.vcf.gz', mode='r')
  4. 像这样的迭代器:vf.fetch("Y", 2_600_000, 2_700_000)
  5. 对于延迟函数,您可以有一个空循环。
4

1 回答 1

1

简短的回答是:重组你的延迟函数,使文件打开阶段发生函数内部,而你传递指向该特定文件所需的参数(例如,路径)。

如果你有兴趣,你可以看看 Dask 在内部是如何做到这一点的, class dask.bytes.core.OpenFile,这是一个可序列化的东西,它会延迟打开,直到它在一个with块中使用。这是一种方便的方法,但您可能可以做一些更简单的事情。

于 2018-11-18T15:43:14.233 回答