5

首先免责声明:我对这两个主题(python 和镶木地板)都很陌生,所以如果我的想法很复杂,请与我联系。

我正在寻找有关如何以最有效的方式最好地完成以下转换的一些指导:

我有一个平面 parquet 文件,其中一个 varchar 列将 JSON 数据存储为字符串,我想将此数据转换为嵌套结构,即 JSON 数据变为嵌套 parquet。如果这有任何帮助,我会提前知道 JSON 的模式。

到目前为止,这是我“完成”的事情:


构建样本数据

# load packages

import pandas as pd
import json
import pyarrow as pa
import pyarrow.parquet as pq

# Create dummy data

# dummy data with JSON as string
person_data = {'Name':  ['Bob'],
        'Age': [25],
        'languages': "{'mother_language': 'English', 'other_languages': ['German', 'French']}"     
        }

# from dict to panda df
person_df = pd.DataFrame.from_dict(person_data)

# from panda df to pyarrow table
person_pat = pa.Table.from_pandas(person_df)

# save as parquet file
pq.write_table(person_pat, 'output/example.parquet')

剧本提案

# load dummy data
sample = pa.parquet.read_table('output/example.parquet')

# transform to dict
sample_dict = sample.to_pydict()
# print with indent for checking
print(json.dumps(sample_dict, sort_keys=True, indent=4))
# load json from string and replace string
sample_dict['languages'] = json.loads(str(sample_dict['languages']))
print(json.dumps(sample_dict, sort_keys=True, indent=4))
#type(sample_dict['languages'])

# how to keep the nested structure when going from dict —> panda df —> pyarrow table?
# save dict as nested parquet...

所以,我这里是我的具体问题:

  1. 这种方法是可行的方法还是可以以任何方式进行优化?dict、df 和 pa table 之间的所有转换都感觉效率不高,很高兴在这里接受教育。
  2. 执行 dict 时如何保留嵌套结构 —> df变换?或者这根本不需要?
  3. 编写嵌套拼花文件的最佳方法是什么?我已经用 Python 阅读了 Parquet 中的嵌套数据,这里提到了快速 Parquet 以供阅读,但缺乏写作能力 - 同时有任何可行的解决方案吗?

非常感谢斯蒂芬

4

1 回答 1

4

PySpark 可以用一种简单的方式来完成,如下所示。使用 PySpark 的主要好处是随着数据的增长,基础设施的可扩展性,但是使用普通的 Python 可能会出现问题,就像你不使用像 Dask 这样的框架一样,你将需要更大的机器来运行它。

from pyspark.sql import HiveContext
hc = HiveContext(sc)

# This is a way to create a PySpark dataframe from your sample, but there are others 
nested_df = hc.read.json(sc.parallelize(["""
{'Name':  ['Bob'],
        'Age': [25],
        'languages': "{'mother_language': 'English', 'other_languages': ['German', 'French']}"     
        }
"""]))

# You have nested Spark dataframe here. This shows the content of the spark dataframe. 20 is the max number of rows to show on the console and False means don't cut the columns that don't fit on the screen (show all columns content)
nested_df.show(20,False)

# Writes to a location as parquet
nested_df.write.parquet('/path/parquet')

# Reads the file from the previous location
spark.read.parquet('/path/parquet').show(20, False)

这段代码的输出是

+----+-----+-----------------------------------------------------------------------+
|Age |Name |languages                                                              |
+----+-----+-----------------------------------------------------------------------+
|[25]|[Bob]|{'mother_language': 'English', 'other_languages': ['German', 'French']}|
+----+-----+-----------------------------------------------------------------------+

+----+-----+-----------------------------------------------------------------------+
|Age |Name |languages                                                              |
+----+-----+-----------------------------------------------------------------------+
|[25]|[Bob]|{'mother_language': 'English', 'other_languages': ['German', 'French']}|
+----+-----+-----------------------------------------------------------------------+

回答您的问题

  1. 我认为这更有效,因为如果您可以在 Spark 中使用更多执行器,那么您拥有多少数据并不重要
  2. 你可以看到,当 parquet 文件被加载时,所有的 dict 和列表都被保留了
  3. 这取决于“最佳”的定义,但我认为这是一个不错的选择;)
于 2020-07-15T21:08:50.710 回答