1

我想先将流写入箭头文件,然后再将其读回熊猫数据帧,尽可能少的内存开销。

批量写入数据效果很好:

import pyarrow as pa
import pandas as pd
import random

data = [pa.array([random.randint(0, 1000)]), pa.array(['B']), pa.array(['C'])]
columns = ['A','B','C']
batch = pa.RecordBatch.from_arrays(data, columns)

with pa.OSFile('test.arrow', 'wb') as f:
    with pa.RecordBatchStreamWriter(f, batch.schema) as writer:
        for i in range(1000 * 1000):
            data = [pa.array([random.randint(0, 1000)]), pa.array(['B']), pa.array(['C'])]
            batch = pa.RecordBatch.from_arrays(data, columns)
            writer.write_batch(batch)

如上所述写入 100 万行速度很快,并且在整个写入过程中使用大约 40MB 内存。这很好。

但是,在生成大约 118MB 的最终数据帧之前,由于内存消耗高达 2GB,因此回读并不好。

我试过这个:

with pa.input_stream('test.arrow') as f:
    reader = pa.BufferReader(f.read())
    table = pa.ipc.open_stream(reader).read_all()
    df1 = table.to_pandas(split_blocks=True, self_destruct=True)

而这个,具有相同的内存开销:

with open('test.arrow', 'rb') as f:
   df1 = pa.ipc.open_stream(f).read_pandas()

数据框大小:

print(df1.info(memory_usage='deep'))

Data columns (total 3 columns):
 #   Column  Non-Null Count    Dtype
---  ------  --------------    -----
 0   A       1000000 non-null  int64
 1   B       1000000 non-null  object
 2   C       1000000 non-null  object
dtypes: int64(1), object(2)
memory usage: 118.3 MB
None

我需要的是使用 pyarrow 修复内存使用情况,或者建议我可以使用哪种其他格式来增量写入数据,然后将所有数据读入 pandas 数据帧,而不会产生太多内存开销。

4

1 回答 1

2

您的示例在单行中使用了许多 RecordBatches。除了数据(模式、潜在的填充/对齐)之外,这样的 RecordBatch 还具有一些开销,因此对于仅存储单行来说效率不高。

使用read_all()or读取文件时read_pandas(),它首先创建所有这些 RecordBatch,然后将它们转换为单个表。然后开销加起来,这就是你所看到的。

RecordBatch 的推荐大小当然取决于具体的用例,但典型的大小是 64k 到 1M 行。


要查看填充到每个数组 64 个字节的效果(https://arrow.apache.org/docs/format/Columnar.html#buffer-alignment-and-padding),让我们检查分配的总字节数与实际字节数由 RecordBatch 表示:

import pyarrow as pa
 
batch = pa.RecordBatch.from_arrays(
    [pa.array([1]), pa.array(['B']), pa.array(['C'])],
    ['A','B','C']
)

# The size of the data stored in the RecordBatch
# 8 for the integer (int64), 9 for each string array (8 for the len-2 offset array (int32), 1 for the single string byte)
>>> batch.nbytes
26

# The size of the data actually being allocated by Arrow
# (5*64 for 5 buffers padded to 64 bytes)
>>> pa.total_allocated_bytes()
320

所以你可以看到,仅仅这个填充就已经给一个小的 RecordBatch 带来了很大的开销

于 2021-05-17T13:16:03.887 回答