这可以使用udf
和来完成explode
。
但是,我们不知道映射中值的类别,因为此信息是推断出来的,不能作为显式类别使用。为了克服这个问题,我们可以通过创建一个具有相同类签名的案例类来“隐藏”推断的类。Spark 然后将这些类视为相同,因为推断类和我们的影子类都转换为相同的StructType
这是一个示例(案例类value
是我们不知道的推断类的替代品)。
scala> case class value(created: Long, lastModified: Long)
defined class value
scala> val myDF = Seq((1, Map("a" -> value(1L,2L), "b" -> value(3L,4L))), (2, Map("c" -> value(5L,6L), "d" -> value(6L,7L)))).toDF("id", "stuff")
myDF: org.apache.spark.sql.DataFrame = [id: int, stuff: map<string,struct<created:bigint,lastModified:bigint>>]
scala> myDF.show
+---+--------------------+
| id| stuff|
+---+--------------------+
| 1|Map(a -> [1,2], b...|
| 2|Map(c -> [5,6], d...|
+---+--------------------+
scala> myDF.printSchema
root
|-- id: integer (nullable = false)
|-- stuff: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- created: long (nullable = false)
| | |-- lastModified: long (nullable = false)
scala> case class shadowOfValue(created: Long, lastModified: Long)
defined class shadowOfValue
scala> val explodeUDF = udf( (map: Map[String, shadowOfValue]) => map.toVector)
explodeUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StructType(StructField(_1,StringType,true), StructField(_2,StructType(StructField(created,LongType,false), StructField(lastModified,LongType,false)),true)),true),Some(List(MapType(StringType,StructType(StructField(created,LongType,false), StructField(lastModified,LongType,false)),true))))
scala> var newDF = myDF.withColumn("TMP", explode(explodeUDF($"stuff"))).drop("stuff")
newDF: org.apache.spark.sql.DataFrame = [id: int, TMP: struct<_1: string, _2: struct<created: bigint, lastModified: bigint>>]
scala> newDF = newDF.withColumn("key", $"TMP".apply("_1")).withColumn("value", $"TMP".apply("_2"))
newDF: org.apache.spark.sql.DataFrame = [id: int, TMP: struct<_1: string, _2: struct<created: bigint, lastModified: bigint>> ... 2 more fields]
scala> newDF = newDF.drop("TMP")
newDF: org.apache.spark.sql.DataFrame = [id: int, key: string ... 1 more field]
scala> newDF.show
+---+---+-----+
| id|key|value|
+---+---+-----+
| 1| a|[1,2]|
| 1| b|[3,4]|
| 2| c|[5,6]|
| 2| d|[6,7]|
+---+---+-----+
scala> newDF.printSchema
root
|-- id: integer (nullable = false)
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
| |-- created: long (nullable = false)
| |-- lastModified: long (nullable = false)