3

我正在尝试将这样的 Dataframe 写入 Parquet:

| foo | bar               |
|-----|-------------------|
|  1  | {"a": 1, "b": 10} |
|  2  | {"a": 2, "b": 20} |
|  3  | {"a": 3, "b": 30} |

我正在使用 Pandas 和 Fastparquet:

df = pd.DataFrame({
    "foo": [1, 2, 3],
    "bar": [{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}]
})

import fastparquet
fastparquet.write('/my/parquet/location/toy-fastparquet.parq', df)

我想在 (py)Spark 中加载 Parquet,并使用 Spark SQL 查询数据,例如:

df = spark.read.parquet("/my/parquet/location/")
df.registerTempTable('my_toy_table')
result = spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 15")

我的问题是,即使fastparquet可以正确读取其 Parquet 文件(该bar字段被正确反序列化为 Struct),在 Spark 中bar也被读取为 String 类型的列,它只包含原始结构的JSON 表示:

In [2]: df.head()                                                                                                                                                                                           
Out[2]: Row(foo=1, bar='{"a": 1, "b": 10}')

我尝试从 PyArrow 编写 Parquet,但没有运气:ArrowNotImplementedError: Level generation for Struct not supported yet. 我也尝试过传递file_scheme='hive'给 Fastparquet,但我得到了相同的结果。将 Fastparquet 序列化更改为 BSON ( object_encoding='bson') 会产生不可读的二进制字段。

[编辑]我看到以下方法:

  • [已回答]从 Spark 编写 Parquet
  • [open]找到一个 Python 库,它实现了Parquet 的嵌套类型规范,并且与 Spark 读取它们的方式兼容
  • [已回答]使用特定的 JSON 反序列化读取 Spark 中的 Fastparquet 文件(我想这会对性能产生影响)
  • 不要完全使用嵌套结构
4

1 回答 1

4

您在这里至少有 3 个选项:

选项1:

您不需要使用任何额外的库,fastparquet因为 Spark 已经提供了该功能:

pdf = pd.DataFrame({
    "foo": [1, 2, 3],
    "bar": [{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}]
})

df = spark.createDataFrame(pdf)
df.write.mode("overwrite").parquet("/tmp/parquet1")

如果尝试使用df = spark.read.parquet("/tmp/parquet1")架构加载数据将是:

StructType([ 
            StructField("foo", LongType(), True),
            StructField("bar",MapType(StringType(), LongType(), True), True)])

正如您在这种情况下所看到的,Spark 将保留正确的架构。

选项 2:

如果出于任何原因仍需要使用fastparquetthenbar将被视为字符串,因此您可以将其加载bar为字符串,然后使用from_json函数将其转换为 JSON。在您的情况下,我们会将 json 作为 Map(string, int) 的字典来处理。这是为了我们自己的方便,因为数据似乎是可以用字典完美表示的键/值序列:

from pyspark.sql.types import StringType, MapType,LongType
from pyspark.sql.functions import from_json

df = spark.read.parquet("/tmp/parquet1")

# schema should be a Map(string, string) 
df.withColumn("bar", from_json("bar", MapType(StringType(), LongType()))).show()

# +---+-----------------+
# |foo|              bar|
# +---+-----------------+
# |  1|[a -> 1, b -> 10]|
# |  2|[a -> 2, b -> 20]|
# |  3|[a -> 3, b -> 30]|
# +---+-----------------+

选项 3:

如果您的架构没有改变,并且您知道 bar 的每个值将始终具有相同的字段组合(a,b),您也可以转换bar为结构:

schema = StructType([ 
                    StructField("a", LongType(), True),
                    StructField("b", LongType(), True)
            ])

df = df.withColumn("bar", from_json("bar", schema))

df.printSchema()

# root
#  |-- foo: long (nullable = true)
#  |-- bar: struct (nullable = true)
#  |    |-- a: long (nullable = true)
#  |    |-- b: long (nullable = true)

例子:

然后你可以运行你的代码:

df.registerTempTable('my_toy_table')

spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 20").show()
# or spark.sql("SELECT * FROM my_toy_table WHERE bar['b'] > 20")

# +---+-----------------+
# |foo|              bar|
# +---+-----------------+
# |  3|[a -> 3, b -> 30]|
# +---+-----------------+
于 2020-02-14T18:12:50.030 回答