1

我想将两个连续查询(基于单个上游连接器的视图)堆叠在一起,并最终在管道末端的接收器结果中保持一致。

  • 第一个视图将按源删除重复的事件。
  • 第二个视图会将来自不同来源的相似事件合并为一个,完全外部连接第一个视图本身并使用 COALESCE。

一次只下沉这些转换中的一个似乎工作正常。最终产生一致的结果。

将它们组合在一起似乎不会处理第一个查询应该删除的迟到的重复事件(如果我们等待的时间足够长)并作为更新状态推送到第二个查询。

例如,如果我使用 toRetractStream 打印去重事件视图,我会得到 e1、e2、e3 作为结果。e4 被淘汰,因为它是 e3 的重复事件。

如果我打印第二个视图,我会得到 e1、e2、e3 和 e4 作为结果,因为 e3 和 e4 都有资格合并。但是,最终,当在第一个视图中发现 e4 是重复事件时,我希望从该结果中删除它。

是否可以使用 Flink Table API 做到这一点?

编辑:在下面分享一个示例用例。

-- Step 0: Create the upstream table with Kinesis connector: Events 
  Table

  All events:
  event_id, book_id, source, happened_at
  11e5dc326,161111,source_1,2021-11-19T01:39:11
  12e5dc326,171111,source_1,2021-11-19T01:39:11
  18e5dc326,171111,source_2,2021-11-29T01:39:11
  19e5dc326,171111,source_2,2022-11-29T01:39:11
  20e5dc326,171111,source_2,2022-11-29T01:39:11
  21e5dc326,171111,source_2,2021-11-30T01:39:11


  -- Step 1: Dedupe events
  CREATE VIEW EventsDeduped
  AS
  (
    SELECT event_id, book_id, source, happenedAt, 
    sourceRawData
     FROM (
        SELECT *,
         ROW_NUMBER() OVER (PARTITION BY book_id, source, ORDER BY 
     happened_at ASC) AS row_num
        FROM Events)
        WHERE row_num = 1
    )
  )

  tableEnv.toRetractStream[Row](tableEnv.sqlQuery("SELECT * FROM 
  EventsDeduped").print()

  12> (true,11e5dc326-f,161111,source_1,2021-11-19T01:39:11)
  12> (true,12e5dc326f,171111,source_1,2021-11-19T01:39:11)
  10> (true,18e5dc326,171111,source_2,2021-11-29T01:39:11)


  CREATE VIEW EventsDedupedSource1
  AS
  (
    SELECT  *
    FROM EventsDeduped
    WHERE source='source_1'
  )

  CREATE VIEW EventsDedupedSource2
  AS
  (
    SELECT  *
    FROM EventsDeduped
    WHERE source='source_2'
  )

  -- Step 2: Merge events coming from different sources
  CREATE VIEW EventsDedupedMerged
  AS
  (
    SELECT
     COALESCE(source_2.event_id, source_1.event_id) AS event_id,
     COALESCE(source_2.book_id, source_1.book_id) AS book_id,
     COALESCE(source_2.source, source_1.source) AS source,
     COALESCE(source_2.happened_at, source_1.happened_at) AS 
    happened_at,
     CASE WHEN
       source_2.book_id IS NOT NULL AND
       source_1.book_id IS NOT NULL THEN True Else False END AS 
    merged
    FROM
     EventsDedupedSource1 source_1
    FULL OUTER JOIN
     EventsDedupedSource2 source_2
    ON source_1.book_id = source_2.book_id AND ABS(TIMESTAMPDIFF(DAY, 
    source_1.happenedAt, source_2.happenedAt)) < 30
  )

  tableEnv.toRetractStream[Row](tableEnv.sqlQuery("SELECT * FROM EventsDedupedMerged").print()

  <action>,event_id, book_id, source, happened_at, merged
 7> (true,11e5dc326,161111,source_1,2021-11-19T01:39:11,false)
 5> (true,12e5dc326,171111,source_1,2021-11-19T01:39:11,false)
 5> (false,12e5dc326,171111,source_1,2021-11-19T01:39:11,false)
 5> (true,18e5dc326,171111,source_2,2021-11-29T01:39:11,true)
 5> (true,19e5dc326,171111,source_2,2022-11-29T01:39:11,false)
 5> (true,20e5dc326,171111,source_2,2022-11-29T01:39:11,false)
 5> (true,21e5dc326,171111,source_2,2021-11-30T01:39:11,true)

  

在此管道的末尾,预计只有18e5dc326事件合并字段 true,因为从第一步开始,id为 21e5dc326的事件应该已被删除。

但是,第二步仍然以某种方式将其与另一个事件合并,因此在其合并字段中具有 true 。

4

1 回答 1

1

这与我的查询中的错误有关。我加入了事件表而不是重复数据表。

无论如何保留这篇文章让其他人看到,是的,您可以随后在 Flink Table API 上加入依赖视图,结果最终将保持一致。

于 2022-03-01T23:22:51.960 回答