对于火花 < 2.4
您可以创建 2 个数据框,一个具有名称和值,另一个具有史诗作为名称和史诗值作为值并将它们联合在一起。然后将它们聚合为 collect_set 并创建一个 json。代码应如下所示。
//Creating Test Data
val df = Seq(("xxx","yyy" ,"EAN" ,"5057723043","1299"), ("xxx","yyy" ,"MPN" ,"EVBD", "1299") )
.toDF("id1", "id2", "name", "value", "epid")
df.show(false)
+---+---+----+----------+----+
|id1|id2|name|value |epid|
+---+---+----+----------+----+
|xxx|yyy|EAN |5057723043|1299|
|xxx|yyy|MPN |EVBD |1299|
+---+---+----+----------+----+
val df1 = df.withColumn("map", struct(col("name"), col("value")))
.select("id1", "id2", "map")
val df2 = df.withColumn("map", struct(lit("EPID").as("name"), col("epid").as("value")))
.select("id1", "id2", "map")
val jsonDF = df1.union(df2).groupBy("id1", "id2")
.agg(collect_set("map").as("item_specifics"))
.withColumn("json", to_json(struct("id1", "id2", "item_specifics")))
jsonDF.select("json").show(false)
+---------------------------------------------------------------------------------------------------------------------------------------------+
|json |
+---------------------------------------------------------------------------------------------------------------------------------------------+
|{"id1":"xxx","id2":"yyy","item_specifics":[{"name":"MPN","value":"EVBD"},{"name":"EAN","value":"5057723043"},{"name":"EPID","value":"1299"}]}|
+---------------------------------------------------------------------------------------------------------------------------------------------+
对于火花 = 2.4
它提供了一个 array_union 方法。在没有联合的情况下这样做可能会有所帮助。我还没有尝试过。
val jsonDF = df.withColumn("map1", struct(col("name"), col("value")))
.withColumn("map2", struct(lit("epid").as("name"), col("epid").as("value")))
.groupBy("id1", "id2")
.agg(collect_set("map1").as("item_specifics1"),
collect_set("map2").as("item_specifics2"))
.withColumn("item_specifics", array_union(col("item_specifics1"), col("item_specifics2")))
.withColumn("json", to_json(struct("id1", "id2", "item_specifics2")))