1

我正在使用 Spark Scala 并且有一个要分组的数据集,然后将 GroupedData 发送到自定义函数。在自定义函数中,我将处理行并更新一个空数据框。

我有以下数据框DF1

+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+
| ACC_SECURITY|ACCOUNT_NO|COSTCENTER|    BU|   MPU|LONG_IND|SHORT_IND|SECURITY_ID|QUANTITY|POS_NEG_QUANTITY|PROCESSED|ALLOC_QUANTITY|NET_QUANTITY|
+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+
|3FA34789290X2|  3FA34789|    0800TS|BOXXBU|BOXXMP|    0101|     5279|      290X2|   18063|               P|         |             0|           0|
|3FA34782290X2|  3FA34782|    0800TS|BOXXBU|BOXXMP|    0102|     5322|      290X2|    -863|               N|         |             0|           0|
|3FA34789290X2|  3FA34789|    0800TS|BOXXBU|BOXXMP|    0101|     5279|      290X2| -108926|               N|         |             0|           0|
|9211530135G71|  92115301|    08036C|BOXXBU|BOXXMP|    0154|     8380|      35G71|    8003|               P|         |             0|           0|
|9211530235G71|  92115302|    08036C|BOXXBU|BOXXMP|    0144|     8382|      35G71|   -2883|               N|         |             0|           0|
+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+

分组后SECURITY_ID,我得到 2 个基于SECURITY_ID值(290X2 和 35G71)的数据集。这些数据集必须发送到自定义函数。

我试过了:

  1. Groupby on,SECURITY_ID但它需要进行一些聚合,而我没有:

    DF1.groupBy("SECURITY_ID").agg(max("SECURITY_ID")).apply(F) 
    

    我不想要聚合,但只要我可以在分组数据集上传递块F中的函数,我仍然可以删除聚合列。applyapply不采取任何自定义功能。

  2. 窗口功能打开,SECURITY_ID但我不知道如何根据每个窗口执行自定义功能:

    val window = Window.partitionBy("security_id") 
    val option2DF = DF1.withColumn("Quantity_Row", F over(window))
    

    我想看看如何F在窗口上调用函数,而不是通过添加列。

4

0 回答 0