使用窗口函数和 with() 表对第一个 row_number 进行过滤。看一下这个:
scala> val df = Seq((1,100,"AB1"),(5,200,"BC3"),(1,400,"FD3"),(6,600,"HJ2"),(1,900,"432"),(3,800,"DS2"),(2,500,"JT4")).toDF("ID","COST","CODE")
df: org.apache.spark.sql.DataFrame = [ID: int, COST: int ... 1 more field]
scala> df.show()
+---+----+----+
| ID|COST|CODE|
+---+----+----+
| 1| 100| AB1|
| 5| 200| BC3|
| 1| 400| FD3|
| 6| 600| HJ2|
| 1| 900| 432|
| 3| 800| DS2|
| 2| 500| JT4|
+---+----+----+
scala> df.createOrReplaceTempView("course")
scala> spark.sql(""" with tab1(select id,cost,code,collect_list(code) over(partition by id order by cost desc rows between current row and 5 following ) cc, row_number() over(partition by id order by cost desc) rc,sum(cost) over(partition by id order by cost desc rows between current row and 5 following) total from course) select id, total, cc from tab1 where rc=1 """).show(false)
+---+-----+---------------+
|id |total|cc |
+---+-----+---------------+
|1 |1400 |[432, FD3, AB1]|
|6 |600 |[HJ2] |
|3 |800 |[DS2] |
|5 |200 |[BC3] |
|2 |500 |[JT4] |
+---+-----+---------------+
scala>