SO中已经有一个很好的问题,但最好的答案现在是5岁,所以我认为2018年应该有更好的选择。
我目前正在寻找大于内存数据集的特征工程管道(使用合适的 dtypes)。
初始文件是不适合内存的 csv。以下是我的需求:
- 创建特征(主要是在多列上使用 groupby 操作。)
- 将新特征合并到以前的数据(在磁盘上,因为它不适合内存)
- 为某些 ML 应用程序使用子集(或所有)列/索引
- 重复 1/2/3(这是一个迭代过程,例如第 1 天:创建 4 个特征,第 2 天:再创建 4 个......)
尝试使用镶木地板和 dask:
首先,我将大 csv 文件拆分为多个小的“镶木地板”文件。有了这个,dask 对计算新特征非常有效,但是,我需要将它们合并到初始数据集和 atm,我们不能向 parquet 文件添加新列。逐块读取 csv,合并和重新保存到多个 parquet 文件太耗时,因为特征工程在这个项目中是一个迭代过程。
尝试使用 HDF 和 dask:
然后我转向 HDF,因为我们可以添加列并使用特殊查询,而且它仍然是二进制文件存储。我再次将大 csv 文件拆分为多个 HDF,并使用相同的 key='base' 作为基本功能,以便使用 DASK 的并发写入(HDF 不允许)。
data = data.repartition(npartitions=10) # otherwise it was saving 8Mo files using to_hdf
data.to_hdf('./hdf/data-*.hdf', key='base', format='table', data_columns=['day'], get=dask.threaded.get)
(附件问题:指定 data_columns 似乎对 dask 没用,因为 dask.read_hdf 中没有“哪里”?)
与我的预期不同,我无法使用如下代码将新功能合并到多个小文件中:
data = dd.read_hdf('./hdf/data-*.hdf', key='base')
data['day_pow2'] = data['day']**2
data['day_pow2'].to_hdf('./hdf/data-*.hdf', key='added', get=dask.threaded.get)
使用 dask.threaded 2% 后我得到“python 停止工作”。使用 dask.multiprocessing.get 它需要永远创建新文件
最适合此工作流程的工具(存储和处理)是什么?