假设我有一个DataSet
看起来像这样的:
Name | Grade
---------------
Josh | 94
Josh | 87
Amanda | 96
Karen | 78
Amanda | 90
Josh | 88
我想创建一个新的DataSet
,其中每个名称有 3 行,其中额外的行(如果有)是从同名的行中采样的(例如,Karen 将有三个相同的行)。
如何在不遍历每个名称的情况下做到这一点?
假设我有一个DataSet
看起来像这样的:
Name | Grade
---------------
Josh | 94
Josh | 87
Amanda | 96
Karen | 78
Amanda | 90
Josh | 88
我想创建一个新的DataSet
,其中每个名称有 3 行,其中额外的行(如果有)是从同名的行中采样的(例如,Karen 将有三个相同的行)。
如何在不遍历每个名称的情况下做到这一点?
资料准备:
val df = Seq(("Josh",94),("Josh",87),("Amanda",96),("Karen",78),("Amanda",90),("Josh",88)).toDF("Name","Grade")
仅当您的数据skewed
用于 a时才执行以下Name
操作:添加一个随机数,并为每个 过滤前 3 个随机数Name
。
val df2 = df.withColumn("random", round(rand()*10))
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("Name").orderBy("random")
val df3 = df2.withColumn("row_number",row_number.over(windowSpec))
.filter($"row_number" <= 3)
现在,汇总每个 的值Name
并重复 3 次,以确保每个 至少有 3 条记录Name
。然后最后取第一个 3 个值,然后explode
df4.groupBy("Name").agg(collect_list("Grade") as "grade_list")
.withColumn("temp_list", slice( flatten(array_repeat($"grade_list", 3)), 1,3))
.select($"Name",explode($"temp_list") as "Grade").show
备注:
grade_list
,因此复制 3 次不会造成伤害。Window
步骤,则可以组合使用when( size($"grade_list") === n, ).otherwise()
上述不必要的重复。