另一种选择,虽然我也喜欢上面的方法,但没有测试过,这个解决方案考虑了性能,添加了我自己的数据:
import spark.implicits._
import org.apache.spark.sql.functions._
val distinctUDF = udf( (s: Seq[String]) => s.distinct ) // courtesy of LeoC
val df = Seq( (1, Array("l1", "l2", "l700")),
(2, Array("l5", "l6", "l3")),
(3, Array("l4", "l5")),
(4, Array("l8", "l10")),
(5, Array("l7", "l8", "l1", "l700")) ).toDF("k", "lv")
val df2 = df.withColumn("lv", explode($"lv")).repartition($"lv") // 200 partitions default
//collect_set() contains distinct elements and collect_list() contains all elements (except nulls
val df3 = df2.groupBy("lv").agg(collect_list("k").as("kv"))
val df4 = df3.filter(size($"kv") > 1).select("kv").distinct
val df5 = df4.withColumn("j", explode($"kv"))
val df6 = df5.join(df, (df5("j") === df("k")))
val df7 = df6.groupBy("kv").agg(collect_set("lv").as("lv"))
df7.withColumn("key", array_min($"kv")).withColumn("values", distinctUDF(flatten($"lv"))).select("key", "values").show(false)
// You can order output as you wish and fusing of lazy eval code takes place
结果(对于这组数据):
+---+-----------------------+
|key|values |
+---+-----------------------+
|2 |[l4, l5, l6, l3] |
|1 |[l1, l2, l700, l7, l8] |
|4 |[l8, l10, l7, l1, l700]|
+---+-----------------------+