我的原始数据框中有“事件”列,我想添加其他 2 列。
事件 | 事件滞后 | 历史事件 |
---|---|---|
0 | ñ | ñ |
0 | 0 | N0 |
1 | 0 | N00 |
0 | 1 | N001 |
我的原始数据框中有“事件”列,我想添加其他 2 列。
事件 | 事件滞后 | 历史事件 |
---|---|---|
0 | ñ | ñ |
0 | 0 | N0 |
1 | 0 | N00 |
0 | 1 | N001 |
from pyspark.sql.functions import lag, col, monotonically_increasing_id, collect_list, concat_ws
from pyspark.sql import Window
#sample data
df= sc.parallelize([[0], [0], [1], [0]]).toDF(["Event"])
#add row index to the dataframe
df = df.withColumn("row_idx", monotonically_increasing_id())
w = Window.orderBy("row_idx")
#add 'Event_Lag' column to the dataframe
df = df.withColumn("Event_Lag", lag(col('Event').cast('string')).over(w))
df = df.fillna({'Event_Lag':'N'})
#finally add 'Hist_Event' column to the dataframe and remove row index column (i.e. 'row_idx') to have the final result
df = df.withColumn("Hist_Event", collect_list(col('Event_Lag')).over(w)).\
withColumn("Hist_Event", concat_ws("","Hist_Event")).\
drop("row_idx")
df.show()
样本输入:
+-----+
|Event|
+-----+
| 0|
| 0|
| 1|
| 0|
+-----+
输出是:
+-----+---------+----------+
|Event|Event_Lag|Hist_Event|
+-----+---------+----------+
| 0| N| N|
| 0| 0| N0|
| 1| 0| N00|
| 0| 1| N001|
+-----+---------+----------+