-1

我有一个如下所示的数据框

Id    linkedIn
1     [l1,l2]
2     [l5,l6,l3]
3     [l4,l5]
4     [l8,l10]
5     [l7,l9,l1]

如果我们看到第 1 行和第 5 行有 l1 相同,那么这两个应该合并为 Id=1 的一行。类似地,第 2 行和第 3 行有 l5 共同点,因此这两个应该合并为 Id=2 的一行,第 4 行应该保持不变,因为它与其他行没有重复。

我希望输出如下所示

Id    linkedIn
1     [l1,l2,l7,l9]
2     [l4,l5,l6,l3]
4     [l8,l10]

我正在使用火花 2.3

4

2 回答 2

1

另一种选择,虽然我也喜欢上面的方法,但没有测试过,这个解决方案考虑了性能,添加了我自己的数据:

import spark.implicits._
import org.apache.spark.sql.functions._

val distinctUDF = udf( (s: Seq[String]) => s.distinct ) // courtesy of LeoC

val df = Seq( (1, Array("l1", "l2", "l700")),
          (2, Array("l5", "l6", "l3")),
          (3, Array("l4", "l5")),
          (4, Array("l8", "l10")),
          (5, Array("l7", "l8", "l1", "l700")) ).toDF("k", "lv")

val df2 = df.withColumn("lv", explode($"lv")).repartition($"lv") // 200 partitions default

//collect_set() contains distinct elements and collect_list() contains all elements (except nulls
val df3 = df2.groupBy("lv").agg(collect_list("k").as("kv"))
val df4 = df3.filter(size($"kv") > 1).select("kv").distinct
val df5 = df4.withColumn("j", explode($"kv"))
val df6 = df5.join(df, (df5("j") === df("k"))) 
val df7 = df6.groupBy("kv").agg(collect_set("lv").as("lv"))

df7.withColumn("key", array_min($"kv")).withColumn("values", distinctUDF(flatten($"lv"))).select("key", "values").show(false) 
// You can order output as you wish and fusing of lazy eval code takes place

结果(对于这组数据):

+---+-----------------------+
|key|values                 |
+---+-----------------------+
|2  |[l4, l5, l6, l3]       |
|1  |[l1, l2, l700, l7, l8] |
|4  |[l8, l10, l7, l1, l700]|
+---+-----------------------+
于 2020-05-22T09:42:03.217 回答
1

对于 2 行共性,您可以尝试以下代码。

val df = Seq(
  (1,Seq("l1","l2")),
  (2,Seq("l5","l6","l3")),
  (3,Seq("l4","l5")),
  (4,Seq("l8","l10")),
  (5,Seq("l7","l9","l1")),
  (6,Seq("l20","l10","l1"))
).toDF("id","values")

val df2 = df.select('id,explode('values).as("value"))
val df3 = df2.join(df2,"value").toDF("value","id","id2")
val df4 = df3.groupBy('id).agg(hash(collect_set('id2)).as("hash"))
val df5 = df2.join(df4,"id").groupBy('hash).agg(collect_set('value).as("values"))
val df6 = df5.join(df4.groupBy('hash).agg(min('id).as("id")),"hash").select('id,'values).orderBy('id)
df6.show()

输出:

+---+----------------+
| id|          values|
+---+----------------+
|  1|[l7, l9, l2, l1]|
|  2|[l4, l3, l6, l5]|
|  4|       [l8, l10]|
+---+----------------+
于 2020-05-21T20:42:59.780 回答