29

pandasread_csv函数似乎没有稀疏选项。我有 csv 数据,其中包含大量零(它压缩得非常好,并且剥离任何0值会将其缩小到几乎是原始大小的一半)。

我试过先用read_csv然后调用将它加载到一个密集的矩阵中to_sparse,但是它需要很长时间并且会阻塞文本字段,尽管大部分数据都是浮点数。如果我pandas.get_dummies(df)首先调用将分类列转换为 1 和 0,然后调用to_sparse(fill_value=0)它会花费大量时间,这比我对具有 1200 万个条目(大部分为零)的大多数数字表的预期要长得多。即使我从原始文件中删除零并调用to_sparse()(以便填充值为 NaN),也会发生这种情况。无论我是否通过kind='block'kind='integer'.

除了手动构建稀疏数据框之外,是否有一种好的、流畅的方法可以直接加载稀疏的 csv 而不会占用大量不必要的内存?


下面是一些用于创建具有 3 列浮点数据和 1 列文本数据的示例数据集的代码。大约 85% 的浮点值为零,CSV 的总大小约为 300 MB,但您可能希望使其更大以真正测试内存限制。

np.random.seed(123)
df=pd.DataFrame( np.random.randn(10000000,3) , columns=list('xyz') )
df[ df < 1.0 ] = 0.0
df['txt'] = np.random.choice( list('abcdefghij'), size=len(df) )
df.to_csv('test.csv',index=False)

这是一种简单的阅读方式,但希望有更好,更有效的方式:

sdf = pd.read_csv( 'test.csv', dtype={'txt':'category'} ).to_sparse(fill_value=0.0)

编辑添加(来自 JohnE): 如果可能,请在您的答案中提供一些有关读取大型 CSV 的相关性能统计信息,包括有关您如何测量内存效率的信息(特别是因为内存效率比时钟时间更难测量)。特别要注意,如果内存效率更高,较慢的(时钟时间)答案可能是这里的最佳答案。

4

2 回答 2

21

我可能会通过使用dask以流方式加载数据来解决这个问题。例如,您可以按如下方式创建一个 dask 数据框:

import dask.dataframe as ddf
data = ddf.read_csv('test.csv')

这个data对象此时实际上还没有做任何事情。它只包含一个“配方”,可以以可管理的块从磁盘读取数据帧。如果要具体化数据,可以调用compute()

df = data.compute().reset_index(drop=True)

此时,您有一个标准的 pandas 数据框(我们之所以这样称呼reset_index,是因为默认情况下每个分区都是独立索引的)。结果相当于你pd.read_csv直接调用得到的结果:

df.equals(pd.read_csv('test.csv'))
# True

dask 的好处是您可以在此“配方”中添加说明以构建数据框;例如,您可以使数据的每个分区稀疏,如下所示:

data = data.map_partitions(lambda part: part.to_sparse(fill_value=0))

此时,调用compute()将构造一个稀疏数组:

df = data.compute().reset_index(drop=True)
type(df)
# pandas.core.sparse.frame.SparseDataFrame

剖析

要检查 dask 方法与 raw pandas 方法的比较,让我们进行一些线路分析。我将使用lprunand mprun,如此所述(完全披露:这是我自己书中的一部分)。

假设你在 Jupyter notebook 中工作,你可以这样运行它:

首先,创建一个包含我们要执行的基本任务的单独文件:

%%file dask_load.py

import numpy as np
import pandas as pd
import dask.dataframe as ddf

def compare_loads():
    df = pd.read_csv('test.csv')
    df_sparse = df.to_sparse(fill_value=0)

    df_dask = ddf.read_csv('test.csv', blocksize=10E6)
    df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
    df_dask = df_dask.compute().reset_index(drop=True)

接下来让我们对计算时间进行逐行分析:

%load_ext line_profiler

from dask_load import compare_loads
%lprun -f compare_loads compare_loads()

我得到以下结果:

Timer unit: 1e-06 s

Total time: 13.9061 s
File: /Users/jakevdp/dask_load.py
Function: compare_loads at line 6

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     6                                           def compare_loads():
     7         1      4746788 4746788.0     34.1      df = pd.read_csv('test.csv')
     8         1       769303 769303.0      5.5      df_sparse = df.to_sparse(fill_value=0)
     9                                           
    10         1        33992  33992.0      0.2      df_dask = ddf.read_csv('test.csv', blocksize=10E6)
    11         1         7848   7848.0      0.1      df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
    12         1      8348217 8348217.0     60.0      df_dask = df_dask.compute().reset_index(drop=True)

我们看到大约 60% 的时间用于 dask 调用,而大约 40% 的时间用于上述示例数组的 pandas 调用。这告诉我们,对于这个任务,dask 比 pandas 慢大约 50%:这是意料之中的,因为数据分区的分块和重组会导致一些额外的开销。

dask 的亮点在于内存使用:让我们使用mprun逐行的内存配置文件:

%load_ext memory_profiler
%mprun -f compare_loads compare_loads()

我机器上的结果是这样的:

Filename: /Users/jakevdp/dask_load.py

Line #    Mem usage    Increment   Line Contents
================================================
     6     70.9 MiB     70.9 MiB   def compare_loads():
     7    691.5 MiB    620.6 MiB       df = pd.read_csv('test.csv')
     8    828.8 MiB    137.3 MiB       df_sparse = df.to_sparse(fill_value=0)
     9                             
    10    806.3 MiB    -22.5 MiB       df_dask = ddf.read_csv('test.csv', blocksize=10E6)
    11    806.4 MiB      0.1 MiB       df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
    12    947.9 MiB    141.5 MiB       df_dask = df_dask.compute().reset_index(drop=True)

我们看到最终的 pandas 数据帧大小约为 140MB,但 pandas 在将数据读入临时密集对象时使用了约 620MB。

另一方面,dask 在加载数组和构建最终稀疏结果时总共只使用了大约 140MB。如果您正在读取其密集大小与系统上可用内存相当的数据,dask 具有明显的优势,尽管计算时间慢了约 50%。


但对于处理大数据,您不应止步于此。大概您正在对数据进行一些操作,而 dask 数据框抽象允许您在实现数据之前执行这些操作(即将它们添加到“配方”中)。因此,如果您对数据所做的事情涉及算术、聚合、分组等,您甚至不需要担心稀疏存储:只需对 dask 对象执行这些操作,最后调用compute(),dask 将占用关心以内存有效的方式应用它们。

因此,例如,我可以max()使用 dask 数据框计算每列的 ,而不必一次将整个内容加载到内存中:

>>> data.max().compute()
x      5.38114
y      5.33796
z      5.25661
txt          j
dtype: object

直接使用 dask 数据帧可以让您避免对数据表示的担忧,因为您可能永远不必一次将所有数据加载到内存中。

祝你好运!

于 2018-01-05T16:42:57.673 回答
10

这是一个主要作为基准提供的答案。希望有比这更好的方法。

chunksize = 1000000       # perhaps try some different values here?
chunks = pd.read_csv( 'test.csv', chunksize=chunksize, dtype={'txt':'category'} )
sdf = pd.concat( [ chunk.to_sparse(fill_value=0.0) for chunk in chunks ] )

正如@acushner 所指出的,您可以改为将其作为生成器表达式执行:

sdf = pd.concat( chunk.to_sparse(fill_value=0.0) for chunk in chunks )

虽然在我的测试中我没有看到任何大的差异,但也许你可能会使用不同的数据,但似乎大家一致认为这比列表组合方式更好。

我希望报告一些关于各种方法的内存分析,但很难获得一致的结果,我怀疑是因为 python 总是在后台清理内存,导致结果中添加了一些随机噪声。(在对 Jake 回答的评论中,他建议在每次之前重新启动 jupyter 内核%memit以获得更一致的结果,但我还没有尝试过。)

但我确实一直发现(使用%%memit)上面的分块读取和@jakevdp 的 dask 方法都使用了大约一半内存的东西,作为 OP 中的幼稚方法。有关 profiling 的更多信息,您应该查看 Jake 的《Python 数据科学手册》一书中的“Profiling and Timing Code”。

于 2018-01-03T13:13:27.283 回答