将 Pandas 数据框附加到现有 .parquet 文件的演示。
注意:其他答案不能附加到现有的 .parquet 文件中。这个可以; 见最后讨论。
在 Windows 和 Linux 上的 Python v3.9 上测试。
使用 pip 安装 PyArrow:
pip install pyarrow==6.0.1
或Anaconda / Miniconda:
conda install -c conda-forge pyarrow=6.0.1 -y
演示代码:
# Q. Demo?
# A. Demo of appending to an existing .parquet file by memory mapping the original file, appending the new dataframe, then writing the new file out.
import os
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
filepath = "parquet_append.parquet"
方法 1 之 2
简单的方法:使用熊猫,读入原始的.parquet文件,追加,写回整个文件。
# Create parquet file.
df = pd.DataFrame({"x": [1.,2.,np.nan], "y": ["a","b","c"]}) # Create dataframe ...
df.to_parquet(filepath) # ... write to file.
# Append to original parquet file.
df = pd.read_parquet(filepath) # Read original ...
df2 = pd.DataFrame({"x": [3.,4.,np.nan], "y": ["d","e","f"]}) # ... create new dataframe to append ...
df3 = pd.concat([df, df2]) # ... concatenate together ...
df3.to_parquet(filepath) # ... overwrite original file.
# Demo that new data frame has been appended to old.
df_copy = pd.read_parquet(filepath)
print(df_copy)
# x y
# 0 1.0 a
# 1 2.0 b
# 2 NaN c
# 0 3.0 d
# 1 4.0 e
# 2 NaN f
方法 2 之 2
更复杂但更快:使用原生 PyArrow 调用,内存映射原始文件,追加新数据帧,写出新文件。
# Write initial file using PyArrow.
df = pd.DataFrame({"x": [1.,2.,np.nan], "y": ["a","b","c"]}) # Create dataframe ...
table = pa.Table.from_pandas(df)
pq.write_table(table, where=filepath)
def parquet_append(filepath:Path or str, df: pd.DataFrame) -> None:
"""
Append to dataframe to existing .parquet file. Reads original .parquet file in, appends new dataframe, writes new .parquet file out.
:param filepath: Filepath for parquet file.
:param df: Pandas dataframe to append. Must be same schema as original.
"""
table_original_file = pq.read_table(source=filepath, pre_buffer=False, use_threads=True, memory_map=True) # Use memory map for speed.
table_to_append = pa.Table.from_pandas(df)
table_to_append = table_to_append.cast(table_original_file.schema) # Attempt to cast new schema to existing, e.g. datetime64[ns] to datetime64[us] (may throw otherwise).
handle = pq.ParquetWriter(filepath, table_original_file.schema) # Overwrite old file with empty. WARNING: PRODUCTION LEVEL CODE SHOULD BE MORE ATOMIC: WRITE TO A TEMPORARY FILE, DELETE THE OLD, RENAME. THEN FAILURES WILL NOT LOSE DATA.
handle.write_table(table_original_file)
handle.write_table(table_to_append)
handle.close() # Writes binary footer. Until this occurs, .parquet file is not usable.
# Append to original parquet file.
df = pd.DataFrame({"x": [3.,4.,np.nan], "y": ["d","e","f"]}) # ... create new dataframe to append ...
parquet_append(filepath, df)
# Demo that new data frame has been appended to old.
df_copy = pd.read_parquet(filepath)
print(df_copy)
# x y
# 0 1.0 a
# 1 2.0 b
# 2 NaN c
# 0 3.0 d
# 1 4.0 e
# 2 NaN f
讨论
@Ibraheem Ibraheem 和 @yardstick17 的答案不能用于附加到现有的 .parquet 文件:
- 限制一:
.close()
调用后,文件无法追加。写完页脚后,一切都一成不变;
- 限制 2:.parquet 文件在被调用之前无法被任何其他程序读取
.close()
(由于缺少二进制页脚,它将引发异常)。
结合起来,这些限制意味着它们不能用于附加到现有的 .parquet 文件,它们只能用于以块的形式写入 .parquet 文件。上述技术消除了这些限制,但效率较低,因为必须重写整个文件以追加到末尾。经过广泛的研究,我相信不可能使用现有的 PyArrow 库(从 v6.0.1 开始)附加到现有的 .parquet 文件。
可以对其进行修改以将文件夹中的多个 .parquet 文件合并为一个 .parquet 文件。
可以执行有效的 upsert:pq.read_table() 对列和行有过滤器,因此如果在加载时过滤掉原始表中的行,新表中的行将有效地替换旧表中的行。这对于时间序列数据会更有用。