0

需求如下:我们要跟踪用户事件并根据以下逻辑创建会话

  • 30 分钟不活动
  • UTC 结束时间

为此,我们将所有用户事件发布到 pubsub。在 apache Beam 管道中,我们读取 pubsub 消息,按 visitor_id 和 event_date 分组,并将它们分组到用户会话窗口中,条件如下:

beam.WindowInto(window.Sessions(30 * 60), timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW)

            (pipeline
     | "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=ap_options.subscription)
     | 'Keyed on Visitor Id and Date' >> beam.Map(lambda x: ((x['visitor_id'], x['event_date']), x))
     | 'User Session Window' >> beam.WindowInto(window.Sessions(30 * 60),
                                                timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW)
     | 'Group on VisitorId And Date' >> beam.GroupByKey()
     | 'Get Sessionsed Rows' >> beam.ParDo(AddSessionId())
     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(table=ap_options.bq_table,
                                                      schema=TABLE_SCHEMA,
                                                      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                      create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)
     )

问题是——我们在 bigquery 中看到 2 个连续事件相隔超过 30 分钟的记录。例如:

在此处输入图像描述

正在发生的事情是——beam 比较最后两个事件,如果它们相隔超过 30 分钟,它会将所有事件转储到 bigquery 中,包括最后一个事件。而它应该将相隔超过 30 分钟的最后一个事件添加到一个单独的会话中。

4

0 回答 0