这在 Spark 3.0.0 和 Scala 2.12.10 中对我来说效果很好。我使用 schema_of_json 以适合 from_json 的格式获取数据的模式,并在链的最后一步应用了 explode 和 * 运算符以相应地扩展。
// TO KNOW THE SCHEMA
scala> val str = Seq("""[{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}]""")
str: Seq[String] = List([{"key1":"value1","key2":"value2"},{"key1":"value3","key2":"value4"}])
scala> val df = str.toDF("json")
df: org.apache.spark.sql.DataFrame = [json: string]
scala> df.show()
+--------------------+
| json|
+--------------------+
|[{"key1":"value1"...|
+--------------------+
scala> val schema = df.select(schema_of_json(df.select(col("json")).first.getString(0))).as[String].first
schema: String = array<struct<key1:string,key2:string>>
使用生成的字符串作为架构:'array<structkey1:string,key2:string>',如下所示:
// TO PARSE THE ARRAY OF JSON's
scala> val parsedJson1 = df.selectExpr("from_json(json, 'array<struct<key1:string,key2:string>>') as parsed_json")
parsedJson1: org.apache.spark.sql.DataFrame = [parsed_json: array<struct<key1:string,key2:string>>]
scala> parsedJson1.show()
+--------------------+
| parsed_json|
+--------------------+
|[[value1, value2]...|
+--------------------+
scala> val data = parsedJson1.selectExpr("explode(parsed_json) as json").select("json.*")
data: org.apache.spark.sql.DataFrame = [key1: string, key2: string]
scala> data.show()
+------+------+
| key1| key2|
+------+------+
|value1|value2|
|value3|value4|
+------+------+
仅供参考,没有星形扩展,中间结果如下所示:
scala> val data = parsedJson1.selectExpr("explode(parsed_json) as json")
data: org.apache.spark.sql.DataFrame = [json: struct<key1: string, key2: string>]
scala> data.show()
+----------------+
| json|
+----------------+
|[value1, value2]|
|[value3, value4]|
+----------------+