1149

在学习熊猫的过程中,我已经尝试了好几个月来想出这个问题的答案。我在日常工作中使用 SAS,它的核心支持非常棒。然而,由于许多其他原因,SAS 作为一个软件是可怕的。

有一天,我希望用 python 和 pandas 代替我对 SAS 的使用,但我目前缺乏大型数据集的核心工作流程。我不是在谈论需要分布式网络的“大数据”,而是说文件太大而无法放入内存但小到足以放入硬盘驱动器。

我的第一个想法是使用HDFStore将大型数据集保存在磁盘上,并仅将我需要的部分拉入数据帧进行分析。其他人提到 MongoDB 是一种更易于使用的替代方案。我的问题是这样的:

完成以下任务的最佳实践工作流程是什么:

  1. 将平面文件加载到永久的磁盘数据库结构中
  2. 查询该数据库以检索数据以输入 pandas 数据结构
  3. 在 Pandas 中操作片段后更新数据库

现实世界的例子将不胜感激,尤其是任何在“大数据”上使用 pandas 的人。

编辑——我希望它如何工作的一个例子:

  1. 迭代地导入一个大的平面文件并将其存储在一个永久的磁盘数据库结构中。这些文件通常太大而无法放入内存。
  2. 为了使用 Pandas,我想读取可以放入内存的这些数据的子集(通常一次只有几列)。
  3. 我将通过对选定列执行各种操作来创建新列。
  4. 然后我必须将这些新列附加到数据库结构中。

我正在尝试找到执行这些步骤的最佳实践方式。阅读有关 pandas 和 pytables 的链接似乎附加一个新列可能是一个问题。

编辑——具体回答杰夫的问题:

  1. 我正在建立消费者信用风险模型。数据种类包括电话、SSN、地址特征;财产价值;犯罪记录、破产等贬损信息……我每天使用的数据集平均有近 1,000 到 2,000 个混合数据类型的字段:数字和字符数据的连续变量、名义变量和有序变量。我很少追加行,但我确实执行了许多创建新列的操作。
  2. 典型的操作包括使用条件逻辑将多个列组合成一个新的复合列。例如,if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'。这些操作的结果是我的数据集中每条记录的新列。
  3. 最后,我想将这些新列附加到磁盘数据结构中。我将重复第 2 步,使用交叉表和描述性统计数据探索数据,试图找到有趣、直观的建模关系。
  4. 一个典型的项目文件通常约为 1GB。文件被组织成这样一种方式,其中一行包含消费者数据的记录。对于每条记录,每一行都有相同数量的列。情况将永远如此。
  5. 在创建新列时,我很少会按行进行子集化。但是,在创建报告或生成描述性统计数据时,对我来说,对行进行子集化是很常见的。例如,我可能想为特定业务线创建一个简单的频率,例如零售信用卡。为此,除了我想要报告的任何列之外,我只会选择那些业务范围 = 零售的记录。但是,在创建新列时,我会提取所有数据行,并且只提取操作所需的列。
  6. 建模过程要求我分析每一列,寻找与某些结果变量的有趣关系,并创建描述这些关系的新复合列。我探索的专栏通常是在小范围内完成的。例如,我将专注于一组 20 列仅处理财产价值的列,并观察它们与贷款违约的关系。一旦探索了这些并创建了新的专栏,我就会转到另一组专栏,比如大学教育,然后重复这个过程。我正在做的是创建候选变量来解释我的数据与某些结果之间的关系。在这个过程的最后,我应用了一些学习技术,从这些复合列中创建一个方程。

我很少会向数据集添加行。我几乎总是会创建新列(统计/机器学习用语中的变量或特征)。

4

16 回答 16

700

我经常以这种方式使用数十 GB 的数据,例如,我在磁盘上有表,我通过查询读取、创建数据并附加回来。

值得阅读文档,并在此线程的后期获取有关如何存储数据的一些建议。

会影响您如何存储数据的详细信息,例如:
提供尽可能多的详细信息;我可以帮助你开发一个结构。

  1. 数据大小、行数、列数、列类型;你是追加行,还是只是列?
  2. 典型的操作会是什么样子。例如,对列进行查询以选择一堆行和特定列,然后执行操作(在内存中),创建新列,保存这些。
    (举一个玩具例子可以让我们提供更具体的建议。)
  3. 处理完之后,你会做什么?第 2 步是临时的还是可重复的?
  4. 输入平面文件:多少,以 Gb 为单位的粗略总大小。这些是如何组织的,例如按记录?每个文件是否包含不同的字段,还是每个文件都有一些记录以及每个文件中的所有字段?
  5. 您是否曾经根据标准选择行(记录)的子集(例如选择字段 A > 5 的行)?然后做一些事情,或者你只是选择所有记录的字段A,B,C(然后做一些事情)?
  6. 您是否“处理”所有列(分组),或者是否有一个很好的比例只能用于报告(例如,您想保留数据,但不需要明确地拉入该列,直到最终结果时间)?

解决方案

确保您至少0.10.1安装了 pandas。

逐块读取迭代文件和多个表查询

由于 pytables 被优化为按行操作(这是您查询的内容),我们将为每组字段创建一个表。这种方式很容易选择一小组字段(这将适用于一个大表,但这样做更有效......我想我将来可能能够解决这个限制......这是无论如何更直观):(
以下是伪代码。)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

读入文件并创建存储(主要是做什么append_to_multiple):

for f in files:
   # read in the file, additional options may be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

现在您在文件中拥有了所有表(实际上,如果您愿意,您可以将它们存储在单独的文件中,您可能必须将文件名添加到 group_map,但这可能不是必需的)。

这是获取列和创建新列的方式:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

当您准备好进行后期处理时:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

关于 data_columns,您实际上不需要定义任何data_columns;它们允许您根据列子选择行。例如:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

在最终报告生成阶段,它们可能对您最感兴趣(本质上,一个数据列与其他列分离,如果您定义很多,这可能会在一定程度上影响效率)。

您可能还想:

  • 创建一个函数,该函数采用字段列表,在 groups_map 中查找组,然后选择这些并连接结果,以便获得结果帧(这本质上是 select_as_multiple 所做的)。这样,结构对您来说将是非常透明的。
  • 某些数据列的索引(使行子集更快)。
  • 启用压缩。

有问题时告诉我!

于 2013-01-10T22:57:22.433 回答
166

我认为上面的答案缺少一种我发现非常有用的简单方法。

当我有一个文件太大而无法加载到内存中时,我会将文件分成多个较小的文件(按行或按列)

示例:如果有 30 天的价值约 30GB 的交易数据,我每天将其分解为约 1GB 大小的文件。我随后分别处理每个文件并在最后汇总结果

最大的优点之一是它允许并行处理文件(多个线程或进程)

另一个优点是文件操作(如示例中的添加/删除日期)可以通过常规 shell 命令完成,这在更高级/复杂的文件格式中是不可能的

这种方法并不涵盖所有场景,但在很多场景中都非常有用

于 2013-12-19T19:46:48.987 回答
115

现在,在问题发生两年后,一个“核心外”的 pandas 等价物:dask。太棒了!虽然它不支持所有 pandas 功能,但你可以用它走得很远。更新:在过去的两年中,它一直得到维护,并且有大量用户社区与 Dask 合作。

而现在,在提出问题四年后,在Vaex中出现了另一个高性能的“核心外”熊猫。它“使用内存映射、零内存复制策略和惰性计算来获得最佳性能(不浪费内存)。” 它可以处理数十亿行的数据集,并且不会将它们存储到内存中(甚至可以对次优硬件进行分析)。

于 2016-03-23T20:30:53.543 回答
73

如果您的数据集在 1 到 20GB 之间,您应该获得一个具有 48GB RAM 的工作站。然后 Pandas 可以将整个数据集保存在 RAM 中。我知道这不是您在这里寻找的答案,但是在具有 4GB RAM 的笔记本电脑上进行科学计算是不合理的。

于 2013-11-02T07:14:07.340 回答
66

我知道这是一个旧线程,但我认为Blaze库值得一试。它专为这些类型的情况而设计。

从文档:

Blaze 将 NumPy 和 Pandas 的可用性扩展到分布式和核外计算。Blaze 提供了一个类似于 NumPy ND-Array 或 Pandas DataFrame 的接口,但将这些熟悉的接口映射到各种其他计算引擎,如 Postgres 或 Spark。

编辑:顺便说一句,它得到了 ContinuumIO 和 NumPy 的作者 Travis Oliphant 的支持。

于 2014-12-03T22:09:40.410 回答
59

pymongo 就是这种情况。我还在 python 中使用 sql server、sqlite、HDF、ORM (SQLAlchemy) 进行了原型设计。首先,pymongo 是一个基于文档的数据库,因此每个人都是一个文档(dict属性)。许多人组成一个集合,您可以拥有多个集合(人、股票市场、收入)。

pd.dateframe -> pymongo 注意:我使用chunksizeinread_csv将其保持在 5 到 10k 条记录(如果更大,pymongo 会丢弃套接字)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

查询:gt = 大于...

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find()返回一个迭代器,所以我通常ichunked用来分割成更小的迭代器。

加入一个怎么样,因为我通常会将 10 个数据源粘贴在一起:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

然后(在我的情况下,有时我必须aJoinDF在其“可合并”之前先进行聚合。)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

然后您可以通过下面的更新方法将新信息写入您的主集合。(逻辑集合与物理数据源)。

collection.update({primarykey:foo},{key:change})

在较小的查找中,只需非规范化即可。例如,您在文档中有代码,您只需添加域代码文本并在dict创建文档时进行查找。

现在你有一个很好的基于一个人的数据集,你可以在每个案例上释放你的逻辑并制作更多属性。最后,您可以将您的 3 to memory max 关键指标读入 pandas 并进行枢轴/聚合/数据探索。这对我来说适用于 300 万条带有数字/大文本/类别/代码/浮点数/...的记录

您还可以使用 MongoDB 中内置的两种方法(MapReduce 和聚合框架)。有关聚合框架的更多信息,请参见此处,因为它似乎比 MapReduce 更容易,并且看起来很方便快速聚合工作。请注意,我不需要定义我的字段或关系,我可以将项目添加到文档中。在快速变化的 numpy、pandas、python 工具集的当前状态下,MongoDB 帮助我开始工作:)

于 2013-01-11T22:11:52.907 回答
54

我发现对大数据用例有用的一个技巧是通过将浮点精度降低到 32 位来减少数据量。它并不适用于所有情况,但在许多应用程序中,64 位精度是多余的,2 倍的内存节省是值得的。为了使一个明显的观点更加明显:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB
于 2017-03-26T05:59:45.757 回答
49

我发现这有点晚了,但我遇到了类似的问题(抵押预付款模型)。我的解决方案是跳过 pandas HDFStore 层并使用直接的 pytables。我在最终文件中将每一列保存为单独的 HDF5 数组。

我的基本工作流程是首先从数据库中获取 CSV 文件。我gzip它,所以它没有那么大。然后我将其转换为面向行的 HDF5 文件,通过在 python 中对其进行迭代,将每一行转换为实际数据类型,并将其写入 HDF5 文件。这需要几十分钟,但它不使用任何内存,因为它只是逐行操作。然后我将面向行的 HDF5 文件“转置”为面向列的 HDF5 文件。

表转置如下所示:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

读回来看起来像:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

现在,我通常在具有大量内存的机器上运行它,所以我可能对内存使用不够小心。例如,默认情况下,加载操作会读取整个数据集。

这通常对我有用,但有点笨拙,而且我不能使用花哨的 pytables 魔法。

编辑:与记录数组 pytables 默认值相比,这种方法的真正优势在于,我可以使用无法处理表的 h5r 将数据加载到 R 中。或者,至少,我无法让它加载异构表。

于 2013-03-21T21:19:30.510 回答
37

正如其他人所指出的,几年后出现了一个“核心外”的 pandas 等价物:dask。尽管 dask 不是 pandas 的替代品,但它的所有功能都因其以下几个原因而脱颖而出:

Dask 是一个用于分析计算的灵活并行计算库,针对“大数据”集合(如并行数组、数据帧和列表)的动态任务调度进行了优化,这些集合将常见接口(如 NumPy、Pandas 或 Python 迭代器)扩展到更大-内存或分布式环境,并从笔记本电脑扩展到集群。

Dask 强调以下优点:

  • 熟悉:提供并行化的 NumPy 数组和 Pandas DataFrame 对象
  • 灵活:为更多自定义工作负载和与其他项目的集成提供任务调度界面。
  • Native:在纯 Python 中启用分布式计算,并访问 PyData 堆栈。
  • 快速:以低开销、低延迟和快速数值算法所需的最小序列化运行
  • 向上扩展:在具有 1000 个内核的集群上弹性运行 向下扩展:在单个进程中在笔记本电脑上轻松设置和运行
  • 响应:设计时考虑到交互式计算,它提供快速反馈和诊断以帮助人类

并添加一个简单的代码示例:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

像这样替换一些熊猫代码:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

并且,特别值得注意的是,通过concurrent.futures接口提供了一个通用的基础设施来提交自定义任务:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()
于 2017-11-22T03:55:40.313 回答
24

值得一提的是Ray
它是一个分布式计算框架,有自己的分布式pandas实现。

只需替换 pandas 导入,代码就可以正常工作:

# import pandas as pd
import ray.dataframe as pd

# use pd as usual

可以在这里阅读更多详细信息:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/


更新:处理 pandas 分发的部分,已被提取到modin项目。

现在使用它的正确方法是:

# import pandas as pd
import modin.pandas as pd
于 2018-03-18T09:30:39.990 回答
21

另一种变化

在 pandas 中完成的许多操作也可以作为 db 查询(sql、mongo)来完成

使用 RDBMS 或 mongodb 允许您在 DB Query 中执行一些聚合(针对大数据进行了优化,并有效地使用缓存和索引)

稍后,您可以使用 pandas 进行后期处理。

这种方法的优点是您获得了处理大数据的数据库优化,同时仍然以高级声明性语法定义逻辑 - 而不必处理决定在内存中做什么和做什么的细节的核心。

尽管查询语言和 pandas 不同,但将部分逻辑从一种翻译到另一种通常并不复杂。

于 2015-04-28T05:22:21.403 回答
14

如果您采用创建数据管道的简单路径将其分解为多个较小的文件,请考虑使用Ruffus 。

于 2014-10-09T19:07:16.193 回答
13

我想指出 Vaex 包。

Vaex 是一个用于惰性核心数据帧(类似于 Pandas)的 Python 库,用于可视化和探索大型表格数据集。它可以在高达每秒十亿 (10 9 ) 个对象/行的 N 维网格上计算统计数据,例如平均值、总和、计数、标准差等。可视化是使用直方图、密度图和 3D 体绘制完成的,允许对大数据进行交互式探索。Vaex 使用内存映射、零内存复制策略和惰性计算来获得最佳性能(不浪费内存)。

看看文档:https ://vaex.readthedocs.io/en/latest/ API 非常接近 pandas 的 API。

于 2019-06-03T09:40:50.147 回答
11

我最近遇到了一个类似的问题。我发现只需以块的形式读取数据并在将其以块的形式写入相同的 csv 时将其附加效果很好。我的问题是根据另一个表中的信息添加日期列,使用某些列的值如下。这可能会帮助那些对 dask 和 hdf5 感到困惑但更熟悉像我这样的 pandas 的人。

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)
于 2016-10-04T15:32:56.040 回答
0

parquet 文件格式非常适合您描述的用例。您可以有效地读取特定的列子集pd.read_parquet(path_to_file, columns=["foo", "bar"])

https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html

于 2021-08-01T19:06:39.840 回答
-2

目前我正在“像”你一样工作,只是规模较小,这就是为什么我没有 PoC 来提供我的建议。

但是,我似乎在使用pickle作为缓存系统并将各种功能的执行外包到文件中取得了成功——从我的突击队/主文件中执行这些文件;例如,我使用 prepare_use.py 来转换对象类型,将数据集拆分为测试、验证和预测数据集。

你的泡菜缓存是如何工作的?我使用字符串来访问动态创建的pickle文件,具体取决于传递的参数和数据集(我尝试捕获并确定程序是否已经运行,使用.shape作为数据集,使用dict传递参数)。尊重这些措施,我得到一个字符串来尝试查找和读取 .pickle 文件,如果找到,可以跳过处理时间以跳转到我现在正在处理的执行。

使用数据库时我遇到了类似的问题,这就是为什么我在使用这个解决方案时感到很高兴,但是 - 肯定有很多限制 - 例如由于冗余而存储大量的泡菜集。可以通过适当的索引将表格从转换之前更新到转换之后 - 验证信息会打开另一本书(我尝试合并抓取的租金数据并在 2 小时后基本停止使用数据库 - 因为我想在之后跳回去每个转换过程)

我希望我的 2 美分能在某种程度上帮助你。

问候。

于 2020-01-08T14:05:14.410 回答