我可能会通过使用dask以流方式加载数据来解决这个问题。例如,您可以按如下方式创建一个 dask 数据框:
import dask.dataframe as ddf
data = ddf.read_csv('test.csv')
这个data
对象此时实际上还没有做任何事情。它只包含一个“配方”,可以以可管理的块从磁盘读取数据帧。如果要具体化数据,可以调用compute()
:
df = data.compute().reset_index(drop=True)
此时,您有一个标准的 pandas 数据框(我们之所以这样称呼reset_index
,是因为默认情况下每个分区都是独立索引的)。结果相当于你pd.read_csv
直接调用得到的结果:
df.equals(pd.read_csv('test.csv'))
# True
dask 的好处是您可以在此“配方”中添加说明以构建数据框;例如,您可以使数据的每个分区稀疏,如下所示:
data = data.map_partitions(lambda part: part.to_sparse(fill_value=0))
此时,调用compute()
将构造一个稀疏数组:
df = data.compute().reset_index(drop=True)
type(df)
# pandas.core.sparse.frame.SparseDataFrame
剖析
要检查 dask 方法与 raw pandas 方法的比较,让我们进行一些线路分析。我将使用lprun
and mprun
,如此处所述(完全披露:这是我自己书中的一部分)。
假设你在 Jupyter notebook 中工作,你可以这样运行它:
首先,创建一个包含我们要执行的基本任务的单独文件:
%%file dask_load.py
import numpy as np
import pandas as pd
import dask.dataframe as ddf
def compare_loads():
df = pd.read_csv('test.csv')
df_sparse = df.to_sparse(fill_value=0)
df_dask = ddf.read_csv('test.csv', blocksize=10E6)
df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
df_dask = df_dask.compute().reset_index(drop=True)
接下来让我们对计算时间进行逐行分析:
%load_ext line_profiler
from dask_load import compare_loads
%lprun -f compare_loads compare_loads()
我得到以下结果:
Timer unit: 1e-06 s
Total time: 13.9061 s
File: /Users/jakevdp/dask_load.py
Function: compare_loads at line 6
Line # Hits Time Per Hit % Time Line Contents
==============================================================
6 def compare_loads():
7 1 4746788 4746788.0 34.1 df = pd.read_csv('test.csv')
8 1 769303 769303.0 5.5 df_sparse = df.to_sparse(fill_value=0)
9
10 1 33992 33992.0 0.2 df_dask = ddf.read_csv('test.csv', blocksize=10E6)
11 1 7848 7848.0 0.1 df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
12 1 8348217 8348217.0 60.0 df_dask = df_dask.compute().reset_index(drop=True)
我们看到大约 60% 的时间用于 dask 调用,而大约 40% 的时间用于上述示例数组的 pandas 调用。这告诉我们,对于这个任务,dask 比 pandas 慢大约 50%:这是意料之中的,因为数据分区的分块和重组会导致一些额外的开销。
dask 的亮点在于内存使用:让我们使用mprun
逐行的内存配置文件:
%load_ext memory_profiler
%mprun -f compare_loads compare_loads()
我机器上的结果是这样的:
Filename: /Users/jakevdp/dask_load.py
Line # Mem usage Increment Line Contents
================================================
6 70.9 MiB 70.9 MiB def compare_loads():
7 691.5 MiB 620.6 MiB df = pd.read_csv('test.csv')
8 828.8 MiB 137.3 MiB df_sparse = df.to_sparse(fill_value=0)
9
10 806.3 MiB -22.5 MiB df_dask = ddf.read_csv('test.csv', blocksize=10E6)
11 806.4 MiB 0.1 MiB df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
12 947.9 MiB 141.5 MiB df_dask = df_dask.compute().reset_index(drop=True)
我们看到最终的 pandas 数据帧大小约为 140MB,但 pandas 在将数据读入临时密集对象时使用了约 620MB。
另一方面,dask 在加载数组和构建最终稀疏结果时总共只使用了大约 140MB。如果您正在读取其密集大小与系统上可用内存相当的数据,dask 具有明显的优势,尽管计算时间慢了约 50%。
但对于处理大数据,您不应止步于此。大概您正在对数据进行一些操作,而 dask 数据框抽象允许您在实现数据之前执行这些操作(即将它们添加到“配方”中)。因此,如果您对数据所做的事情涉及算术、聚合、分组等,您甚至不需要担心稀疏存储:只需对 dask 对象执行这些操作,最后调用compute()
,dask 将占用关心以内存有效的方式应用它们。
因此,例如,我可以max()
使用 dask 数据框计算每列的 ,而不必一次将整个内容加载到内存中:
>>> data.max().compute()
x 5.38114
y 5.33796
z 5.25661
txt j
dtype: object
直接使用 dask 数据帧可以让您避免对数据表示的担忧,因为您可能永远不必一次将所有数据加载到内存中。
祝你好运!