4

由于数据量大,使用当前版本的dask ('0.7.5', github: [a1]) 时,我能够通过dask.dataframe api 执行分区计算。但是对于作为记录存储在 bcolz ('0.12.1', github: [a2]) 中的大型 DataFrame,我在执行此操作时遇到了 IndexError:

import dask.dataframe as dd
import bcolz

ctable = bcolz.open('test.bcolz', mode='r')
df_dd = dd.from_bcolz(ctable, chunksize=int(1E6))

# some calculations
z1 = (df_dd[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1)
z2 = (df_dd[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1)

df_dd_out = dd.concat([z1.to_frame('z1'), z2.to_frame('z2')], axis=1)

# actual computation
df_dd_out.compute()

错误是(缩写的回溯输出):

# ...      
    File "/usr/local/lib/python3.5/dist-packages/dask/async.py", line 481, in get_async
        raise(remote_exception(res, tb))
dask.async.IndexError: index out of bounds

实际上,只有在执行 dd.concat 操作时才会出现错误。就像是

out = (z1.to_frame('z1') + z2.to_frame('z2')).compute()

正在工作。

但是,当在内存中读取部分数据时,在某些情况下也会出现此错误,至少对于分区长度 (npartition) >1 和特定数据大小而言。

ctable_mem_b = ctable[:int(1E7)] # larger in-memory copy
df_dd_mem_b = dd.from_pandas(pd.DataFrame.from_records(ctable_mem_b),
                             npartitions=10)

请参阅完整的测试代码_test_dask_error.py和带有回溯的完整输出_test_out.txt

实际上,在那一步我停止了调查,因为我不知道如何在 async.py 中将这个错误调试为根本原因。当然,我会将其报告为错误(如果没有提示用户/使用错误)。但是:如何进行调试以找到根本原因?

_[a1]:_https://github.com/blaze/dask/tree/077b1b82ad03f855a960d252df2aaaa72b5b1cc5

_[a2]:_https://github.com/Blosc/bcolz/tree/562fd3092d1fee17372c11cadca54d1dab10cf9a

4

2 回答 2

1

摘自dask 文档常见问题解答

问:使用 dask 时如何调试我的程序?

如果您想深入了解 Python 调试器,一个常见的挫败原因是异步调度程序,因为它们在不同的工作程序上运行您的代码,因此无法提供对 Python 调试器的访问。幸运的是,您可以更改为同步调度程序,例如 dask.getdask.async.get_sync通过向方法提供get=关键字compute::

my_array.compute(get=dask.async.get_sync)

两者都dask.async.get_syncdask.get提供回溯遍历。 dask.async.get_sync使用与异步调度程序相同的机制,但只有一名工作人员。 dask.get非常简单,但不缓存数据,因此对于某些工作负载可能会很慢。

评论

我很想知道问题是什么。如果使用上述方法后原因不是很明显,那么我建议在dask issue tracker上提出问题。

于 2015-12-23T03:50:20.190 回答
0

使用后

df_dd_mem_b.compute(get=dask.async.get_sync)

很明显,错误

#...
    File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 1637, in _loc
        result = df.loc[start:stop]

发生了,因为_loc没有给出确切的边界,而是stop超出了边界

df_dd_out_mem_b.divisions

输出:(0、1000000、2000000、3000000、4000000、5000000、6000000、7000000、8000000、9000000、9999999)

并且在中断的任务中被(例如)调用

df.loc[1000000:2000000]

虽然最后一个索引标签是 1999999。

问题在于pandas.DataFrame.loc它给出了:“允许的输入是:[...] 带有标签'a':'f'的切片对象,(请注意,与通常的python切片相反,开始和停止都包括在内!) "(取自文档稳定版本,0.17.1)。显然对于小数字不会引发越界错误,但对于大数字(i>~1E6),我在这个测试中得到了 IndexError:

df = pd.DataFrame({0: range(i)}).loc[0:i]

对于 pd.DataFrame.iloc,根据文档,这种不确定的行为似乎确实不是问题:“如果请求的索引器超出范围,.iloc 将引发 IndexError,但允许超出范围索引的切片索引器除外。”,和确实,简短的测试在这里没有显示不规则的越界错误:

df = pd.DataFrame({0: range(i)}).iloc[0:i]

对于给定的 dask 问题,它肯定不是一个适当的解决方案,因为_loc它写得更通用,但最终只适用于本质上是特定调用

result = df.loc[slice(*df.index[[0, -1]])]
于 2015-12-24T01:06:08.260 回答