1

我想为可能发生变化的数据框生成顺序唯一 ID。当我说更改时,这意味着在我今天生成 ID 之后明天将添加更多行数。当添加更多行时,我想查找具有生成的 id 的 id 列,并为新添加的数据增加

+-------+--------------------+-------------+
|deal_id|           deal_name|Unique_id    |
+-------+--------------------+--------------
| 613760|ABCDEFGHI           |            1|    
| 613740|TEST123             |            2|             
| 598946|OMG                 |            3|    

假设我明天获得更多数据,我想将相同的数据附加到这个数据帧,并且唯一 id 应该增加到 4 并继续。

+-------+--------------------+-------------+
|deal_id|           deal_name|Unique_id    |
+-------+--------------------+--------------
| 613760|ABCDEFGHI           |            1|    
| 613740|TEST123             |            2|             
| 598946|OMG                 |            3|
| 591234|OM21                |            4|
| 988217|Otres               |            5|
.
.
.

代码片段

deals_df_final = deals_df.withColumn("Unique_id",F.monotonically_increasing_id())

但这并没有给出顺序 ID。

我可以尝试使用索引的 row_num 和 RDD zip,但看起来数据框将是不可变的。

请问有什么帮助吗?我希望能够在添加数据时生成并增加 id。

4

1 回答 1

0

非常简短的说明,如果它有帮助 - 我有同样的问题,这篇文章中的第二个例子帮助了我:https ://kb.databricks.com/sql/gen-unique-increasing-values.html

我当前正在进行的代码:

from pyspark.sql import (
    SparkSession,
    functions as F,
    window as W
)

df_with_increasing_id = df.withColumn("monotonically_increasing_id", F.monotonically_increasing_id())
window = W.Window.orderBy(F.col('monotonically_increasing_id'))
df_with_consecutive_increasing_id = df_with_increasing_id.withColumn('increasing_id', F.row_number().over(window))
    df = df_with_consecutive_increasing_id.drop('monotonically_increasing_id')
# now find the maximum value in the `increasing_id` column in the current dataframe before appending new
previous_max_id = df.agg({'increasing_id': 'max'}).collect()[0]
previous_max_id = previous_max_id['max(increasing_id)']
# CREATE NEW ROW HERE
# and then create new ids (same way as creating them originally)
# then union or vertically concatenate it with the old dataframe to get the combined one
df.withColumn("cnsecutiv_increase", F.col("increasing_id") + F.lit(previous_max_id)).show()
于 2021-11-29T17:04:38.787 回答