我需要计算一天中发生 A 的次数以及在 15 分钟内发生 B 的次数。流可能是 A1 ,A2,B1,B2,A3,B3,B4,B5,A4,A5,A6,A7,B6 。在我的情况下,事件结果是A2,B1 A3,B3 A7,B6。当匹配器发生时我需要接收实时结果。我已经厌倦了一些东西。我认为这只能通过使用flink cep才能实现。但是flink- sql-cep 不支持聚合。它只计算事件发生。在这种情况下,如何用一条 SQL 完成这个任务。
我累了两步。我先用flink sql cep to matcher,然后sink to kafka。在一步中,我使用 pre kafka 并使用窗口聚合。
第一步: select pins as pin,'first-step' as result_id, cast(order_amount as varchar) as result_value,event_time as result_time from stra_dtpipeline MATCH_RECOGNIZE ( PARTITION BY pin
ORDER BY event_time MEASURES
t1.pin as pins, '1' as order_amount , LOCALTIMESTAMP as event_time ONE ROW PER MATCH AFTER MATCH SKIP to next row PATTERN (t1 t2) WITHIN INTERVAL '30' SECOND
DEFINE
t1 as t1.act_type='100001' , t2 as t2.act_type='100002' ) 第二步:select pin,'job5' as result_id,cast(sum(1) over (PARTITION BY pin,cast(DATE_FORMAT(event_time, '%Y%m%d') as VARCHAR) order by event_time ROWS BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW ) as VARCHAR) as result_value, CURRENT_TIMESTAMP as result_time from stra_dtpipeline_mid where result_id='first-step' and DAYOFMONTH(CURRENT_DATE )=DAYOFMONTH(event_time)
我希望用一个 SQL 完成这项任务。