1

我的要求是基于 2 个事件(EVT_A 和 EVT_B 独立于顺序)生成触发器。这里是期待

1. EVT_A arrived. --> No action
2. EVT_B arrived  --> Should Trigger
3. EVT_B arrived  --> should Trigger since A was received previously (o/p should include A and current B)
4. EVT_A arrived  --> should Trigger since B was received previously (o/p should include current A and last B)
5. EVT_A arrived  --> should Trigger since B was received previously (o/p should include current A and last B)

我尝试了以下但没有成功。

SELECT E.*
From MyEvents
MATCH_RECOGNIZE (
    ORDER BY procTime
    MEASURES ARRAY[
        Event(A.id, A.name, A.date),
        Event(B.id, B.name, B.date)
    ] AS Events
    AFTER MATCH SKIP TO NEXT ROW
    PATTERN (A C* B)
    DEFINE
        A AS name in ('EVT_A', 'EVT_B'),
        B AS name in ('EVT_A', 'EVT_B') AND B.name <> A.name,
        C AS name not in ('EVT_A', 'EVT_B')
) AS E;

我也试过“ AFTER MATCH SKIP TO FIRST A ”。但它也有例外。任何建议我如何使用 Flink SQL CEP 或 Flink 中的任何其他方式来实现这一点。

4

1 回答 1

0

这似乎是 RichFlatMap 或 ProcessFunction 是最简单的方法的情况。只需要一点状态:

ValueState<Event> lastA;
ValueState<Event> lastB;

然后处理每个传入事件的逻辑是这样的:

if EVT_A
  store event as lastA
  emit lastB with this event unless lastB is null
if EVT_B
  store event as lastB
  emit lastA with this event unless lastA is null

如果您正在寻找使用 Flink 托管状态的介绍,文档中有一个教程

于 2020-09-03T19:29:35.927 回答