39

你如何附加/更新到一个parquet文件pyarrow

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


 table2 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
 table3 = pd.DataFrame({'six': [-1, np.nan, 2.5], 'nine': ['foo', 'bar', 'baz'], 'ten': [True, False, True]})


pq.write_table(table2, './dataNew/pqTest2.parquet')
#append pqTest2 here?  

我在文档中没有找到关于附加镶木地板文件的任何内容。而且,您可以使用pyarrow 多处理来插入/更新数据吗?

4

4 回答 4

38

我遇到了同样的问题,我想我可以使用以下方法解决它:

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


chunksize=10000 # this is the number of lines

pqwriter = None
for i, df in enumerate(pd.read_csv('sample.csv', chunksize=chunksize)):
    table = pa.Table.from_pandas(df)
    # for the first chunk of records
    if i == 0:
        # create a parquet write object giving it an output file
        pqwriter = pq.ParquetWriter('sample.parquet', table.schema)            
    pqwriter.write_table(table)

# close the parquet writer
if pqwriter:
    pqwriter.close()
于 2017-12-15T20:10:46.977 回答
15

在您的情况下,列名不一致,我使三个示例数据框的列名一致,以下代码对我有用。

# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


def append_to_parquet_table(dataframe, filepath=None, writer=None):
    """Method writes/append dataframes in parquet format.

    This method is used to write pandas DataFrame as pyarrow Table in parquet format. If the methods is invoked
    with writer, it appends dataframe to the already written pyarrow table.

    :param dataframe: pd.DataFrame to be written in parquet format.
    :param filepath: target file location for parquet file.
    :param writer: ParquetWriter object to write pyarrow tables in parquet format.
    :return: ParquetWriter object. This can be passed in the subsequenct method calls to append DataFrame
        in the pyarrow Table
    """
    table = pa.Table.from_pandas(dataframe)
    if writer is None:
        writer = pq.ParquetWriter(filepath, table.schema)
    writer.write_table(table=table)
    return writer


if __name__ == '__main__':

    table1 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    table2 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    table3 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    writer = None
    filepath = '/tmp/verify_pyarrow_append.parquet'
    table_list = [table1, table2, table3]

    for table in table_list:
        writer = append_to_parquet_table(table, filepath, writer)

    if writer:
        writer.close()

    df = pd.read_parquet(filepath)
    print(df)

输出:

   one  three  two
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
于 2018-02-02T06:53:36.963 回答
12

一般来说,Parquet 数据集由多个文件组成,因此您可以通过将附加文件写入数据所属的同一目录来追加。能够轻松连接多个文件会很有用。我打开了https://issues.apache.org/jira/browse/PARQUET-1154以使这可以在 C++(以及 Python)中轻松完成

于 2017-11-04T19:26:16.807 回答
0

将 Pandas 数据框附加到现有 .parquet 文件的演示。

注意:其他答案不能附加到现有的 .parquet 文件中。这个可以; 见最后讨论。

在 Windows 和 Linux 上的 Python v3.9 上测试。

使用 pip 安装 PyArrow:

pip install pyarrow==6.0.1

Anaconda / Miniconda

conda install -c conda-forge pyarrow=6.0.1 -y

演示代码:

# Q. Demo?
# A. Demo of appending to an existing .parquet file by memory mapping the original file, appending the new dataframe, then writing the new file out.

import os
import numpy as np
import pandas as pd
import pyarrow as pa  
import pyarrow.parquet as pq  

filepath = "parquet_append.parquet"

方法 1 之 2

简单的方法:使用熊猫,读入原始的.parquet文件,追加,写回整个文件。

# Create parquet file.
df = pd.DataFrame({"x": [1.,2.,np.nan], "y": ["a","b","c"]})  # Create dataframe ...
df.to_parquet(filepath)  # ... write to file.

# Append to original parquet file.
df = pd.read_parquet(filepath)  # Read original ...
df2 = pd.DataFrame({"x": [3.,4.,np.nan], "y": ["d","e","f"]})  # ... create new dataframe to append ...
df3 = pd.concat([df, df2])  # ... concatenate together ...
df3.to_parquet(filepath)  # ... overwrite original file.

# Demo that new data frame has been appended to old.
df_copy = pd.read_parquet(filepath)
print(df_copy)
#      x  y
# 0  1.0  a
# 1  2.0  b
# 2  NaN  c
# 0  3.0  d
# 1  4.0  e
# 2  NaN  f

方法 2 之 2

更复杂但更快:使用原生 PyArrow 调用,内存映射原始文件,追加新数据帧,写出新文件。

# Write initial file using PyArrow.
df = pd.DataFrame({"x": [1.,2.,np.nan], "y": ["a","b","c"]})  # Create dataframe ...
table = pa.Table.from_pandas(df)
pq.write_table(table, where=filepath)

def parquet_append(filepath:Path or str, df: pd.DataFrame) -> None:
    """
    Append to dataframe to existing .parquet file. Reads original .parquet file in, appends new dataframe, writes new .parquet file out.
    :param filepath: Filepath for parquet file.
    :param df: Pandas dataframe to append. Must be same schema as original.
    """
    table_original_file = pq.read_table(source=filepath,  pre_buffer=False, use_threads=True, memory_map=True)  # Use memory map for speed.
    table_to_append = pa.Table.from_pandas(df)
    table_to_append = table_to_append.cast(table_original_file.schema)  # Attempt to cast new schema to existing, e.g. datetime64[ns] to datetime64[us] (may throw otherwise).
    handle = pq.ParquetWriter(filepath, table_original_file.schema)  # Overwrite old file with empty. WARNING: PRODUCTION LEVEL CODE SHOULD BE MORE ATOMIC: WRITE TO A TEMPORARY FILE, DELETE THE OLD, RENAME. THEN FAILURES WILL NOT LOSE DATA.
    handle.write_table(table_original_file)
    handle.write_table(table_to_append)
    handle.close()  # Writes binary footer. Until this occurs, .parquet file is not usable.

# Append to original parquet file.
df = pd.DataFrame({"x": [3.,4.,np.nan], "y": ["d","e","f"]})  # ... create new dataframe to append ...
parquet_append(filepath, df)

# Demo that new data frame has been appended to old.
df_copy = pd.read_parquet(filepath)
print(df_copy)
#      x  y
# 0  1.0  a
# 1  2.0  b
# 2  NaN  c
# 0  3.0  d
# 1  4.0  e
# 2  NaN  f

讨论

@Ibraheem Ibraheem 和 @yardstick17 的答案不能用于附加到现有的 .parquet 文件:

  • 限制一:.close()调用后,文件无法追加。写完页脚后,一切都一成不变;
  • 限制 2:.parquet 文件在被调用之前无法被任何其他程序读取.close()(由于缺少二进制页脚,它将引发异常)。

结合起来,这些限制意味着它们不能用于附加到现有的 .parquet 文件,它们只能用于以块的形式写入 .parquet 文件。上述技术消除了这些限制,但效率较低,因为必须重写整个文件以追加到末尾。经过广泛的研究,我相信不可能使用现有的 PyArrow 库(从 v6.0.1 开始)附加到现有的 .parquet 文件。

可以对其进行修改以将文件夹中的多个 .parquet 文件合并为一个 .parquet 文件。

可以执行有效的 upsert:pq.read_table() 对列和行有过滤器,因此如果在加载时过滤掉原始表中的行,新表中的行将有效地替换旧表中的行。这对于时间序列数据会更有用。

于 2022-01-22T22:33:45.910 回答