需求如下:我们要跟踪用户事件并根据以下逻辑创建会话
- 30 分钟不活动
或 - UTC 结束时间
为此,我们将所有用户事件发布到 pubsub。在 apache Beam 管道中,我们读取 pubsub 消息,按 visitor_id 和 event_date 分组,并将它们分组到用户会话窗口中,条件如下:
beam.WindowInto(window.Sessions(30 * 60), timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW)
(pipeline
| "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=ap_options.subscription)
| 'Keyed on Visitor Id and Date' >> beam.Map(lambda x: ((x['visitor_id'], x['event_date']), x))
| 'User Session Window' >> beam.WindowInto(window.Sessions(30 * 60),
timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW)
| 'Group on VisitorId And Date' >> beam.GroupByKey()
| 'Get Sessionsed Rows' >> beam.ParDo(AddSessionId())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(table=ap_options.bq_table,
schema=TABLE_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)
)
问题是——我们在 bigquery 中看到 2 个连续事件相隔超过 30 分钟的记录。例如:
正在发生的事情是——beam 比较最后两个事件,如果它们相隔超过 30 分钟,它会将所有事件转储到 bigquery 中,包括最后一个事件。而它应该将相隔超过 30 分钟的最后一个事件添加到一个单独的会话中。