我正在尝试将迭代器通过(非标准)类似文件的对象传递给dask.delayed
函数。当我尝试时compute()
,我从 dask 收到以下消息,以及下面的回溯。
distributed.protocol.pickle - INFO - Failed to serialize
([<items>, ... ], OrderedDict(..)).
Exception: self.ptr cannot be converted to a Python object for pickling
Traceback (most recent call last):
File "/home/user/miniconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 38, in dumps
result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
File "stringsource", line 2, in pysam.libcbcf.VariantRecord.__reduce_cython__
TypeError: self.ptr cannot be converted to a Python object for pickling
源代码的对应部分如下所示:
delayed(to_arrow)(vf.fetch(..), ordered_dict)
vf
是类文件对象,并vf.fetch(..)
返回文件中存在的记录的迭代器(这是一个VCF 文件,我正在使用pysam
库来读取它)。我希望这提供了足够的背景。
来自的消息dask
显示迭代发生在函数调用期间而不是函数内部,这让我相信传递迭代器可能是不合适的。所以我做了一个快速检查sum(range(..))
,这似乎有效。现在我很困惑,我错过了什么?
为此提供一个最小的工作示例有点困难。但也许以下内容会有所帮助。
- 从这里下载一个 VCF 文件(和它的索引):说,
ALL.chrY*vcf.gz{,.tbi}
pip3 install --user pysam
- 打开文件:
vf = VariantFile('/path/to/file.vcf.gz', mode='r')
- 像这样的迭代器:
vf.fetch("Y", 2_600_000, 2_700_000)
- 对于延迟函数,您可以有一个空循环。