您好我正在尝试将每个窗口的最后一个值扩展到该列的窗口的其余部分,count
以便创建一个标志,该标志识别寄存器是否是窗口的最后一个值。我以这种方式尝试过,但没有奏效。
样本 DF:
val df_197 = Seq [(Int, Int, Int, Int)]((1,1,7,10),(1,10,4,300),(1,3,14,50),(1,20,24,70),(1,30,12,90),(2,10,4,900),(2,25,30,40),(2,15,21,60),(2,5,10,80)).toDF("policyId","FECMVTO","aux","IND_DEF").orderBy(asc("policyId"), asc("FECMVTO"))
df_197.show
+--------+-------+---+-------+
|policyId|FECMVTO|aux|IND_DEF|
+--------+-------+---+-------+
| 1| 1| 7| 10|
| 1| 3| 14| 50|
| 1| 10| 4| 300|
| 1| 20| 24| 70|
| 1| 30| 12| 90|
| 2| 5| 10| 80|
| 2| 10| 4| 900|
| 2| 15| 21| 60|
| 2| 25| 30| 40|
+--------+-------+---+-------+
val juntar_riesgo = 1
val var_entidad_2 = $"aux"
//Particionar por uno o dos campos en funcion del valor de la variable juntar_riesgo
//Se creará window_number_2 basado en este particionamiento
val winSpec = if(juntar_riesgo == 1) {
Window.partitionBy($"policyId").orderBy($"FECMVTO")
} else {
Window.partitionBy(var_entidad_2,$"policyId").orderBy("FECMVTO")
}
val df_308 = df_307.withColumn("window_number", row_number().over(winSpec))
.withColumn("count", last("window_number",true) over (winSpec))
.withColumn("FLG_LAST_WDW", when(col("window_number") === col("count"),1).otherwise(lit(0))).show
结果(第一个分区的所有元素的列数需要为 5,第二个分区的所有元素的列数需要为 4):
+--------+-------+---+-------+-------------+-----+------------+
|policyId|FECMVTO|aux|IND_DEF|window_number|count|FLG_LAST_WDW|
+--------+-------+---+-------+-------------+-----+------------+
| 1| 1| 7| 10| 1| 1| 1|
| 1| 3| 14| 50| 2| 2| 1|
| 1| 10| 4| 300| 3| 3| 1|
| 1| 20| 24| 70| 4| 4| 1|
| 1| 30| 12| 90| 5| 5| 1|
| 2| 5| 10| 80| 1| 1| 1|
| 2| 10| 4| 900| 2| 2| 1|
| 2| 15| 21| 60| 3| 3| 1|
| 2| 25| 30| 40| 4| 4| 1|
+--------+-------+---+-------+-------------+-----+------------+
然后我读到当你使用orderBy
afterwindowPartition
子句时,你必须指定子句.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
才能实现我所需要的。但是,当我尝试它时,我遇到了这个错误:
val juntar_riesgo = 1
val var_entidad_2 = $"aux"
//Particionar por uno o dos campos en funcion del valor de la variable juntar_riesgo
//Se creará window_number_2 basado en este particionamiento
val winSpec = if(juntar_riesgo == 1) {
Window.partitionBy($"policyId").orderBy($"FECMVTO")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
} else {
Window.partitionBy(var_entidad_2,$"policyId").orderBy("FECMVTO")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
}
val df_198 = df_197.withColumn("window_number", row_number().over(winSpec))
.withColumn("count", last("window_number",true) over (winSpec))
.withColumn("FLG_LAST_WDW", when(col("window_number") === col("count"),1).otherwise(lit(0))).show
ERROR: org.apache.spark.sql.AnalysisException: Window Frame specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$()) must match the required frame specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$());
谢谢你的帮助!