2

我有一个问题应该在 SQL 之外解决,但由于业务限制需要在 SQL 中解决。

  • 所以,请不要告诉我在 SQL 之外的数据摄取时这样做,我想这样做,但这不是一个选项......


我有一个事件流,具有 4 个主要属性....

  • 源设备
  • 事件的时间戳
  • 事件的“类型”
  • 事件的“有效负载” (代表各种数据类型的可怕 VARCHAR)


我需要做的是将流分成几部分(我将其称为“会话”)

  • 每个会话都特定于设备(实际上,PARTITION BY device_id
  • 一个会话不得包含多个相同类型的事件


为了缩短示例,我将它们限制为仅包含时间戳和 event_type ...

 timestamp | event_type          desired_session_id
-----------+------------        --------------------
     0     |     1                      0
     1     |     4                      0
     2     |     2                      0
     3     |     3                      0

     4     |     2                      1
     5     |     1                      1
     6     |     3                      1
     7     |     4                      1

     8     |     4                      2

     9     |     4                      3
    10     |     1                      3

    11     |     1                      4
    12     |     2                      4

理想化的最终输出可能是旋转最终结果......

device_id | session_id | event_type_1_timestamp | event_type_1_payload |  event_type_2_timestamp | event_type_2_payload ...

(但这还不是一成不变的,但我需要“知道”哪些事件构成一个会话,它们的时间戳是什么,以及它们的有效负载是什么。可能只需将 session_id 列附加到输入就足够了,只要我不“丢失”其他属性。)


有:

  • 12 种离散事件类型
  • 数十万台设备
  • 每台设备数十万个事件
  • 每个“会话”大约 6-8 个事件的“标准”
  • 但有时一个会话可能只有 1 个或全部 12 个

这些因素意味着半笛卡尔积之类的,嗯,不太理想,但可能是“唯一的方法”。


我(在我的脑海中)玩过分析函数和间隙和孤岛类型的过程,但永远无法达到目标。我总是回到一个我“想要”一些标志的地方,我可以将它们从一行带到另一行并根据需要重置它们......

在 SQL 中不起作用的伪代码...

flags = [0,0,0,0,0,0,0,0,0]
session_id = 0
for each row in stream
   if flags[row.event_id] == 0 then
       flags[row.event_id] = 1
   else
       session_id++
       flags = [0,0,0,0,0,0,0,0,0]
   row.session_id = session_id

对此的任何 SQL 解决方案都表示赞赏,但如果您还可以考虑“同时发生”的事件,您将获得“奖励积分”......

If multiple events happen at the same timestamp
  If ANY of those events are in the "current" session
    ALL of those events go in to a new session
  Else
    ALL of those events go in to the "current" session

If such a group of event include the same event type multiple times
  Do whatever you like
  I'll have had enough by that point...
  But set the session as "ambiguous" or "corrupt" with some kind of flag?
4

3 回答 3

1

我不是 100% 确定这可以在 SQL 中完成。但我有一个可能有效的算法的想法:

  • 枚举每个事件的计数
  • 将每个点的最大计数作为事件的“分组”(这是会话)

所以:

select t.*,
       (max(seqnum) over (partition by device order by timestamp) - 1) as desired_session_id
from (select t.*,
             row_number() over (partition by device, event_type order by timestamp) as seqnum
      from t
     ) t;

编辑:

评论太长了。我有一种感觉,这需要递归 CTE (RBAR)。这是因为您不能停留在单行并查看累积信息或相邻信息以确定该行是否应该开始新会话。

当然,在某些情况下它是显而易见的(例如,上一行具有相同的事件)。而且,也有可能有一些巧妙的方法可以聚合以前的数据,这使得它成为可能。

编辑二:

如果没有递归 CTE(RBAR),我认为这是不可能的。这不是一个数学证明,但这就是我的直觉的来源。

想象一下,您正在回顾当前的 4 行,并且您有:

1
2
1
2
1  <-- current row

这个会话是什么?它不是确定的。考虑:

e     s           vs        e     s          
1     1                     2     1    <-- row not in look back
1     2                     1     1
2     2                     2     2
1     3                     1     2
2     3                     2     3
1     4                     1     3

价值取决于更远的过去。显然,这个例子可以一直延伸到第一个事件。我认为没有办法“聚合”早期的值来区分这两种情况。

如果您可以确定地说给定事件是新会话的开始,则该问题是可以解决的。这似乎需要完整的先验知识,至少在某些情况下是这样。显然在某些情况下这很容易——例如连续发生两个事件。不过,我怀疑这些是此类序列的“少数”。

也就是说,您并没有在整个表中完全被 RBAR 所困扰,因为您需要device_id并行化。我不确定您的环境是否可以做到这一点,但在 BQ 或 Postgres 中,我会:

  • 沿着每个设备聚合以创建具有时间和事件信息的结构数组。
  • 遍历数组一次,可能使用自定义代码。
  • 通过重新加入原始表或取消嵌套逻辑来重新分配会话。
于 2018-10-30T11:24:22.053 回答
1

UPD 基于讨论(未经检查/测试,粗略的想法):

WITH
trailing_events as (
    select *, listagg(event_type::varchar,',') over (partition by device_id order by ts rows between previous 12 rows and current row) as events
    from tbl
)
,session_flags as (
    select *, f_get_session_flag(events) as session_flag
    from trailing_events
)
SELECT
 *
,sum(session_flag::int) over (partition by device_id order by ts) as session_id
FROM session_flags

f_get_session_flag在哪里

create or replace function f_get_session_flag(arr varchar(max))
returns boolean
stable as $$
stream = arr.split(',')
flags = [0,0,0,0,0,0,0,0,0,0,0,0]
is_new_session = False
for row in stream:
   if flags[row.event_id] == 0:
       flags[row.event_id] = 1
       is_new_session = False
   else:
       session_id+=1
       flags = [0,0,0,0,0,0,0,0,0,0,0,0]
       is_new_session = True
return is_new_session
$$ language plpythonu;

上一个答案:

标志可以复制为事件运行计数和 2 的除法余数:

1 -> 1%2 = 1
2 -> 2%2 = 0
3 -> 3%2 = 1
4 -> 4%2 = 0
5 -> 5%2 = 1
6 -> 6%2 = 0

并连接成一个位掩码(类似于flags伪代码中的数组)。唯一棘手的一点是何时将所有标志完全重置为零并启动新的会话 ID,但我可以非常接近。如果您的示例表被调用t并且它具有tstype列,则脚本可能如下所示:

with
-- running count of the events
t1 as (
    select
     *
    ,sum(case when type=1 then 1 else 0 end) over (order by ts) as type_1_cnt
    ,sum(case when type=2 then 1 else 0 end) over (order by ts) as type_2_cnt
    ,sum(case when type=3 then 1 else 0 end) over (order by ts) as type_3_cnt
    ,sum(case when type=4 then 1 else 0 end) over (order by ts) as type_4_cnt
    from t
)
-- mask
,t2 as (
    select
     *
    ,case when type_1_cnt%2=0 then '0' else '1' end ||
     case when type_2_cnt%2=0 then '0' else '1' end ||
     case when type_3_cnt%2=0 then '0' else '1' end ||
     case when type_4_cnt%2=0 then '0' else '1' end as flags
    from t1
)
-- previous row's mask
,t3 as (
    select
     *
    ,lag(flags) over (order by ts) as flags_prev
    from t2
)
-- reset the mask if there is a switch from 1 to 0 at any position
,t4 as (
    select *
    ,case
        when (substring(flags from 1 for 1)='0' and substring(flags_prev from 1 for 1)='1')
        or (substring(flags from 2 for 1)='0' and substring(flags_prev from 2 for 1)='1')
        or (substring(flags from 3 for 1)='0' and substring(flags_prev from 3 for 1)='1')
        or (substring(flags from 4 for 1)='0' and substring(flags_prev from 4 for 1)='1')
        then '0000'
        else flags
     end as flags_override
    from t3
)
-- get the previous value of the reset mask and same event type flag for corner case 
,t5 as (
    select *
    ,lag(flags_override) over (order by ts) as flags_override_prev
    ,type=lag(type) over (order by ts) as same_event_type
    from t4
)
-- again, session ID is a switch from 1 to 0 OR same event type (that can be a switch from 0 to 1)
select
 ts
,type
,sum(case
 when (substring(flags_override from 1 for 1)='0' and substring(flags_override_prev from 1 for 1)='1')
        or (substring(flags_override from 2 for 1)='0' and substring(flags_override_prev from 2 for 1)='1')
        or (substring(flags_override from 3 for 1)='0' and substring(flags_override_prev from 3 for 1)='1')
        or (substring(flags_override from 4 for 1)='0' and substring(flags_override_prev from 4 for 1)='1')
        or same_event_type
        then 1
        else 0 end
 ) over (order by ts) as session_id
from t5
order by ts
;

您可以添加必要的分区并扩展到 12 种事件类型,此代码旨在处理您提供的示例表......它并不完美,如果您运行子查询,您会看到标志被重置的频率比需要的频率高但是总体而言,除了会话 id 2 的极端情况外,它在具有相同事件类型 = 4 的另一个会话结束后具有单个事件类型 = 4,因此我添加了一个简单的查找same_event_type并将其用作新的另一个条件会话 ID,希望这将适用于更大的数据集。

于 2018-10-30T14:40:25.717 回答
0

我决定采用的解决方案实际上是“不要在 SQL 中这样做”,方法是将实际会话推迟到用 python 编写的标量函数。

--
-- The input parameter should be a comma delimited list of identifiers
-- Each identified should be a "power of 2" value, no lower than 1
-- (1, 2, 4, 8, 16, 32, 64, 128, etc, etc)
--
-- The input '1,2,4,2,1,1,4' will give the output '0001010'
--
CREATE OR REPLACE FUNCTION public.f_indentify_collision_indexes(arr varchar(max))
RETURNS VARCHAR(MAX)
STABLE AS
$$
    stream = map(int, arr.split(','))
    state = 0
    collisions = []
    item_id = 1
    for item in stream:
        if (state & item) == (item):
            collisions.append('1')
            state = item
        else:
            state |= item
            collisions.append('0')
        item_id += 1

    return ''.join(collisions)
$$
LANGUAGE plpythonu;

注意:如果有数百种事件类型,我不会使用它;)


实际上,我按顺序传入了事件的数据结构,返回的是新会话开始的数据结构。

我选择了实际的数据结构,以便尽可能简单地处理 SQL 方面的问题。(可能不是最好的,对其他想法非常开放。)

INSERT INTO
    sessionised_event_stream
SELECT
    device_id,
    REGEXP_COUNT(
        LEFT(
            public.f_indentify_collision_indexes(
                LISTAGG(event_type_id, ',')
                    WITHIN GROUP (ORDER BY session_event_sequence_id)
                    OVER (PARTITION BY device_id)
            ),
            session_event_sequence_id::INT
        ),
        '1',
        1
    ) + 1
        AS session_login_attempt_id,
    session_event_sequence_id,
    event_timestamp,
    event_type_id,
    event_data
FROM
(
    SELECT
        *,
        ROW_NUMBER()
            OVER (PARTITION BY device_id
                      ORDER BY event_timestamp, event_type_id, event_data)
                AS session_event_sequence_id
    FROM
        event_stream
)
  1. 为事件断言确定性顺序(包含同时发生的事件等)
    ROW_NUMBER() OVER (stuff) AS session_event_sequence_id

  2. 创建一个逗号分隔的 event_type_id 列表
    LISTAGG(event_type_id, ',')=>'1,2,4,8,2,1,4,1,4,4,1,1'

  3. 使用 python 计算边界
    public.f_magic('1,2,4,8,2,1,4,1,4,4,1,1')=>'000010010101'

  4. 对于序列中的第一个事件,计算 1 的数量,直到并包括“边界”中的第一个字符。对于序列中的第二个事件,计算 1 的数量,直到边界中的第二个字符包括在内,等等。
    event 01 = 1=> boundaries = '0' => session_id = 0
    event 02 = 2=> boundaries = '00' => session_id = 0
    event 03 = 4=> boundaries = '000' => session_id = 0
    event 04 = 8=> boundaries = '0000' => session_id = 0
    event 05 = 2=> boundaries = '00001' => session_id = 1
    event 06 = 1=> boundaries = '000010' => session_id = 1
    event 07 = 4=> boundaries = '0000100' => session_id = 1
    event 08 = 1=> boundaries = '00001001' => session_id = 2
    event 09 = 4=> boundaries = '000010010' => session_id = 2
    event 10 = 4=> boundaries = '0000100101' => session_id = 3
    event 11 = 1=> boundaries = '00001001010' => session_id = 3
    event 12 = 1=> boundaries = '000010010101'=>session_id = 4

    REGEXP_COUNT( LEFT('000010010101', session_event_sequence_id), '1', 1 )

结果不是很快,但比我尝试过的其他选项更强大并且仍然更好。它的“感觉”是(也许,也许,我不确定,警告,警告)如果流中有 100 个项目,LIST_AGG()则调用一次,python UDF 被调用 100 次。我可能错了。我见过 Redshift 做的更糟;)


伪代码被证明是一个更糟糕的选择。

Write some SQL that can find "the next session" from any given stream.

Run that SQL once storing the results in a temp table.
=> Now have the first session from every stream

Run it again using the temp table as an input
=> We now also have the second session from every stream

Keep repeating this until the SQL inserts 0 rows in to the temp table
=> We now have all the sessions from every stream

计算每个会话所花费的时间相对较短,实际上主要是向 RedShift 发出重复请求的开销。这也意味着主导因素是“最长的流中有多少会话” (在我的例子中,0.0000001% 的流比平均值长 1000 倍。)

在大多数个别情况下, python 版本实际上速度较慢,但​​不受那些烦人的异常值的支配。这意味着总体而言,python 版本比此处描述的“外部循环”版本快了大约 10 倍。它还使用了一个存储桶来加载更多的 CPU 资源,但是现在经过的时间是更重要的因素 :)

于 2018-11-05T10:16:59.307 回答