我有一个有效的 Pyspark Windowing 函数 (Spark 2.0),它需要过去 30 天 (86400*30) 秒,并计算每个 ID 列“a”中每个操作发生的次数。我应用此函数的数据集在“2018-01-01”和“2018-04-01”之间每天都有多条记录。因为这是一个 30 天的回顾,我不想将此函数应用于没有完整 30 天回顾的数据。为方便起见,我想从 2 月 1 日开始计算。我不能过滤掉一月份,因为二月份的计数需要它。我知道我可以在新数据框上添加一个过滤器并在 2 月份之前过滤掉数据,但是有没有办法在没有额外步骤的情况下做到这一点?不必进行可以节省时间的计算会很好。
这是代码:
from pyspark.sql import Window
from pyspark.sql import functions as F
windowsess = Window.partitionBy("id",'a').orderBy('ts').rangeBetween(-86400*30, Window.currentRow)
df4 = df3.withColumn("2h4_ct",F.count(df.a).over(windowsess))
当前数据集的模型。我不想手动转换col ts,所以我写了一个替代品。
id,a,timestamp,ts
1,soccer,2018-01-01 10:41:00, <unix_timestamp>
1,soccer,2018-01-13 10:40:00, <unix_timestamp>
1,soccer,2018-01-23 10:39:00, <unix_timestamp>
1,soccer,2018-02-01 10:38:00, <unix_timestamp>
1,soccer,2018-02-03 10:37:00, <unix_timestamp>
1,leagueoflegends,2018-02-04 10:36:00, <unix_timestamp>
用我编造的样本数据。我想返回以下行
1,soccer,2018-02-01 10:38:00, <unix_timestamp>,4
1,soccer,2018-02-03 10:37:00, <unix_timestamp>,5
1,leagueoflegends,2018-02-04 10:36:00, <unix_timestamp>,1
相反,我得到了这个:
1,soccer,2018-01-01 10:41:00, <unix_timestamp>,1
1,soccer,2018-01-13 10:40:00, <unix_timestamp>,2
1,soccer,2018-01-23 10:39:00, <unix_timestamp>,3
1,soccer,2018-02-01 10:38:00, <unix_timestamp>,4
1,soccer,2018-02-03 10:37:00, <unix_timestamp>,5
1,leagueoflegends,2018-02-04 10:36:00, <unix_timestamp>,1