2

SO中已经有一个很好的问题,但最好的答案现在是5岁,所以我认为2018年应该有更好的选择。

我目前正在寻找大于内存数据集的特征工程管道(使用合适的 dtypes)。

初始文件是不适合内存的 csv。以下是我的需求:

  1. 创建特征(主要是在多列上使用 groupby 操作。)
  2. 将新特征合并到以前的数据(在磁盘上,因为它不适合内存)
  3. 为某些 ML 应用程序使用子集(或所有)列/索引
  4. 重复 1/2/3(这是一个迭代过程,例如第 1 天:创建 4 个特征,第 2 天:再创建 4 个......)

尝试使用镶木地板和 dask:

首先,我将大 csv 文件拆分为多个小的“镶木地板”文件。有了这个,dask 对计算新特征非常有效,但是,我需要将它们合并到初始数据集和 atm,我们不能向 parquet 文件添加新列。逐块读取 csv,合并和重新保存到多个 parquet 文件太耗时,因为特征工程在这个项目中是一个迭代过程。

尝试使用 HDF 和 dask:

然后我转向 HDF,因为我们可以添加列并使用特殊查询,而且它仍然是二进制文件存储。我再次将大 csv 文件拆分为多个 HDF,并使用相同的 key='base' 作为基本功能,以便使用 DASK 的并发写入(HDF 不允许)。

data = data.repartition(npartitions=10) # otherwise it was saving 8Mo files using to_hdf
data.to_hdf('./hdf/data-*.hdf', key='base', format='table', data_columns=['day'], get=dask.threaded.get)

附件问题:指定 data_columns 似乎对 dask 没用,因为 dask.read_hdf 中没有“哪里”?

与我的预期不同,我无法使用如下代码将新功能合并到多个小文件中:

data = dd.read_hdf('./hdf/data-*.hdf', key='base')
data['day_pow2'] = data['day']**2
data['day_pow2'].to_hdf('./hdf/data-*.hdf', key='added', get=dask.threaded.get) 

使用 dask.threaded 2% 后我得到“python 停止工作”。使用 dask.multiprocessing.get 它需要永远创建新文件

最适合此工作流程的工具(存储和处理)是什么?

4

2 回答 2

1

我将复制有关 fastparquet相关问题的评论:技术上可以将列添加到现有 parquet 数据集,但这在 fastparquet 中没有实现,也可能在任何其他 parquet 实现中也没有。

编写代码来执行此操作可能不会太繁重(但目前没有计划):写入列的调用顺序发生,因此用于写入的新列需要向下渗透到此函数,以及与当前对应的文件位置页脚中元数据的第一个字节。另外,架构需要单独更新(这很简单)。需要对数据集的每个文件重复该过程。这不是问题的“答案”,但也许有人想承担这项任务。

于 2018-04-01T19:19:54.327 回答
0

我会认真考虑使用数据库(索引访问)作为存储,甚至使用 Apache Spark(用于以分布式/集群方式处理数据)和 Hive / Impala 作为后端......

于 2018-03-29T11:38:37.883 回答