看起来您的窗口没有分区,并且事件没有相同数量的记录。考虑到这一点,我想到的解决方案是使用每个事件开始的位置来检索相应的值。
考虑到按时间戳排序,我们提取每一行的位置:
from pyspark.sql import Window
from pyspark.sql.functions import col, rank, collect_list, expr
df = (
spark.createDataFrame(
[
{ 'timestamp': '2021-02-02 01:03:55', 'col1': 's1' },
{ 'timestamp': '2021-02-02 01:04:16.952854', 'col1': 's1', 'col2': 'other_ind'},
{ 'timestamp': '2021-02-02 01:04:32.398155', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:04:53.793089', 'col1': 's1', 'col2': 'event_start_ind', 'col3': 'event_1_value'},
{ 'timestamp': '2021-02-02 01:05:10.936913', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:05:36', 'col1': 's1', 'col2': 'other_ind'},
{ 'timestamp': '2021-02-02 01:05:42', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:05:43', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:05:44', 'col1': 's1', 'col2': 'event_start_ind', 'col3': 'event_2_value'},
{ 'timestamp': '2021-02-02 01:05:46.623198', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:06:50', 'col1': 's1'},
{ 'timestamp': '2021-02-02 01:07:19.607685', 'col1': 's1'}
]
)
.withColumn('timestamp', col('timestamp').cast('timestamp'))
.withColumn("line", rank().over(Window.orderBy("timestamp")))
)
df.show(truncate=False)
+----+--------------------------+---------------+-------------+----+
|col1|timestamp |col2 |col3 |line|
+----+--------------------------+---------------+-------------+----+
|s1 |2021-02-02 01:03:55 |null |null |1 |
|s1 |2021-02-02 01:04:16.952854|other_ind |null |2 |
|s1 |2021-02-02 01:04:32.398155|null |null |3 |
|s1 |2021-02-02 01:04:53.793089|event_start_ind|event_1_value|4 |
|s1 |2021-02-02 01:05:10.936913|null |null |5 |
|s1 |2021-02-02 01:05:36 |other_ind |null |6 |
|s1 |2021-02-02 01:05:42 |null |null |7 |
|s1 |2021-02-02 01:05:43 |null |null |8 |
|s1 |2021-02-02 01:05:44 |event_start_ind|event_2_value|9 |
|s1 |2021-02-02 01:05:46.623198|null |null |10 |
|s1 |2021-02-02 01:06:50 |null |null |11 |
|s1 |2021-02-02 01:07:19.607685|null |null |12 |
+----+--------------------------+---------------+-------------+----+
之后,我们确定每个事件的开始:
df_event_start = (
df.filter(col("col2") == 'event_start_ind')
.select(
col("line").alias("event_start_line"),
col("col3").alias("event_value")
)
)
df_event_start.show()
+----------------+-------------+
|event_start_line| event_value|
+----------------+-------------+
| 4|event_1_value|
| 9|event_2_value|
+----------------+-------------+
使用event_start
信息查找下一个有效事件开始:
df_with_event_starts = (
df.join(
df_event_start.select(collect_list('event_start_line').alias("event_starts"))
)
.withColumn("next_valid_event", expr("element_at(filter(event_starts, x -> x >= line), 1)"))
)
df_with_event_starts.show(truncate=False)
+----+--------------------------+---------------+-------------+----+------------+----------------+
|col1|timestamp |col2 |col3 |line|event_starts|next_valid_event|
+----+--------------------------+---------------+-------------+----+------------+----------------+
|s1 |2021-02-02 01:03:55 |null |null |1 |[4, 9] |4 |
|s1 |2021-02-02 01:04:16.952854|other_ind |null |2 |[4, 9] |4 |
|s1 |2021-02-02 01:04:32.398155|null |null |3 |[4, 9] |4 |
|s1 |2021-02-02 01:04:53.793089|event_start_ind|event_1_value|4 |[4, 9] |4 |
|s1 |2021-02-02 01:05:10.936913|null |null |5 |[4, 9] |9 |
|s1 |2021-02-02 01:05:36 |other_ind |null |6 |[4, 9] |9 |
|s1 |2021-02-02 01:05:42 |null |null |7 |[4, 9] |9 |
|s1 |2021-02-02 01:05:43 |null |null |8 |[4, 9] |9 |
|s1 |2021-02-02 01:05:44 |event_start_ind|event_2_value|9 |[4, 9] |9 |
|s1 |2021-02-02 01:05:46.623198|null |null |10 |[4, 9] |null |
|s1 |2021-02-02 01:06:50 |null |null |11 |[4, 9] |null |
|s1 |2021-02-02 01:07:19.607685|null |null |12 |[4, 9] |null |
+----+--------------------------+---------------+-------------+----+------------+----------------+
最后检索正确的值:
(
df_with_event_starts.join(
df_event_start,
col("next_valid_event") == col("event_start_line"),
how="left"
)
.drop("line", "event_starts", "next_valid_event", "event_start_line")
.show(truncate=False)
)
+----+--------------------------+---------------+-------------+-------------+
|col1|timestamp |col2 |col3 |event_value |
+----+--------------------------+---------------+-------------+-------------+
|s1 |2021-02-02 01:03:55 |null |null |event_1_value|
|s1 |2021-02-02 01:04:16.952854|other_ind |null |event_1_value|
|s1 |2021-02-02 01:04:32.398155|null |null |event_1_value|
|s1 |2021-02-02 01:04:53.793089|event_start_ind|event_1_value|event_1_value|
|s1 |2021-02-02 01:05:10.936913|null |null |event_2_value|
|s1 |2021-02-02 01:05:36 |other_ind |null |event_2_value|
|s1 |2021-02-02 01:05:42 |null |null |event_2_value|
|s1 |2021-02-02 01:05:43 |null |null |event_2_value|
|s1 |2021-02-02 01:05:44 |event_start_ind|event_2_value|event_2_value|
|s1 |2021-02-02 01:05:46.623198|null |null |null |
|s1 |2021-02-02 01:06:50 |null |null |null |
|s1 |2021-02-02 01:07:19.607685|null |null |null |
+----+--------------------------+---------------+-------------+-------------+
该解决方案会给您带来处理大量数据的问题。如果您能找出每个事件的关键,我建议您继续使用窗口函数进行初始解决方案。如果发生这种情况,您可以测试last
or first
sql 函数(忽略空值)。
希望有人会帮助您提供更好的解决方案。
提示:在问题中提供数据框创建脚本很有帮助。