由于数据量大,使用当前版本的dask ('0.7.5', github: [a1]) 时,我能够通过dask.dataframe api 执行分区计算。但是对于作为记录存储在 bcolz ('0.12.1', github: [a2]) 中的大型 DataFrame,我在执行此操作时遇到了 IndexError:
import dask.dataframe as dd
import bcolz
ctable = bcolz.open('test.bcolz', mode='r')
df_dd = dd.from_bcolz(ctable, chunksize=int(1E6))
# some calculations
z1 = (df_dd[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1)
z2 = (df_dd[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1)
df_dd_out = dd.concat([z1.to_frame('z1'), z2.to_frame('z2')], axis=1)
# actual computation
df_dd_out.compute()
错误是(缩写的回溯输出):
# ...
File "/usr/local/lib/python3.5/dist-packages/dask/async.py", line 481, in get_async
raise(remote_exception(res, tb))
dask.async.IndexError: index out of bounds
实际上,只有在执行 dd.concat 操作时才会出现错误。就像是
out = (z1.to_frame('z1') + z2.to_frame('z2')).compute()
正在工作。
但是,当在内存中读取部分数据时,在某些情况下也会出现此错误,至少对于分区长度 (npartition) >1 和特定数据大小而言。
ctable_mem_b = ctable[:int(1E7)] # larger in-memory copy
df_dd_mem_b = dd.from_pandas(pd.DataFrame.from_records(ctable_mem_b),
npartitions=10)
请参阅完整的测试代码_test_dask_error.py和带有回溯的完整输出_test_out.txt。
实际上,在那一步我停止了调查,因为我不知道如何在 async.py 中将这个错误调试为根本原因。当然,我会将其报告为错误(如果没有提示用户/使用错误)。但是:如何进行调试以找到根本原因?
_[a1]:_https://github.com/blaze/dask/tree/077b1b82ad03f855a960d252df2aaaa72b5b1cc5
_[a2]:_https://github.com/Blosc/bcolz/tree/562fd3092d1fee17372c11cadca54d1dab10cf9a