我正在使用 Spark Scala 并且有一个要分组的数据集,然后将 GroupedData 发送到自定义函数。在自定义函数中,我将处理行并更新一个空数据框。
我有以下数据框DF1
:
+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+
| ACC_SECURITY|ACCOUNT_NO|COSTCENTER| BU| MPU|LONG_IND|SHORT_IND|SECURITY_ID|QUANTITY|POS_NEG_QUANTITY|PROCESSED|ALLOC_QUANTITY|NET_QUANTITY|
+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+
|3FA34789290X2| 3FA34789| 0800TS|BOXXBU|BOXXMP| 0101| 5279| 290X2| 18063| P| | 0| 0|
|3FA34782290X2| 3FA34782| 0800TS|BOXXBU|BOXXMP| 0102| 5322| 290X2| -863| N| | 0| 0|
|3FA34789290X2| 3FA34789| 0800TS|BOXXBU|BOXXMP| 0101| 5279| 290X2| -108926| N| | 0| 0|
|9211530135G71| 92115301| 08036C|BOXXBU|BOXXMP| 0154| 8380| 35G71| 8003| P| | 0| 0|
|9211530235G71| 92115302| 08036C|BOXXBU|BOXXMP| 0144| 8382| 35G71| -2883| N| | 0| 0|
+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+
分组后SECURITY_ID
,我得到 2 个基于SECURITY_ID
值(290X2 和 35G71)的数据集。这些数据集必须发送到自定义函数。
我试过了:
Groupby on,
SECURITY_ID
但它需要进行一些聚合,而我没有:DF1.groupBy("SECURITY_ID").agg(max("SECURITY_ID")).apply(F)
我不想要聚合,但只要我可以在分组数据集上传递块
F
中的函数,我仍然可以删除聚合列。apply
但apply
不采取任何自定义功能。窗口功能打开,
SECURITY_ID
但我不知道如何根据每个窗口执行自定义功能:val window = Window.partitionBy("security_id") val option2DF = DF1.withColumn("Quantity_Row", F over(window))
我想看看如何
F
在窗口上调用函数,而不是通过添加列。