我想将两个连续查询(基于单个上游连接器的视图)堆叠在一起,并最终在管道末端的接收器结果中保持一致。
- 第一个视图将按源删除重复的事件。
- 第二个视图会将来自不同来源的相似事件合并为一个,完全外部连接第一个视图本身并使用 COALESCE。
一次只下沉这些转换中的一个似乎工作正常。最终产生一致的结果。
将它们组合在一起似乎不会处理第一个查询应该删除的迟到的重复事件(如果我们等待的时间足够长)并作为更新状态推送到第二个查询。
例如,如果我使用 toRetractStream 打印去重事件视图,我会得到 e1、e2、e3 作为结果。e4 被淘汰,因为它是 e3 的重复事件。
如果我打印第二个视图,我会得到 e1、e2、e3 和 e4 作为结果,因为 e3 和 e4 都有资格合并。但是,最终,当在第一个视图中发现 e4 是重复事件时,我希望从该结果中删除它。
是否可以使用 Flink Table API 做到这一点?
编辑:在下面分享一个示例用例。
-- Step 0: Create the upstream table with Kinesis connector: Events
Table
All events:
event_id, book_id, source, happened_at
11e5dc326,161111,source_1,2021-11-19T01:39:11
12e5dc326,171111,source_1,2021-11-19T01:39:11
18e5dc326,171111,source_2,2021-11-29T01:39:11
19e5dc326,171111,source_2,2022-11-29T01:39:11
20e5dc326,171111,source_2,2022-11-29T01:39:11
21e5dc326,171111,source_2,2021-11-30T01:39:11
-- Step 1: Dedupe events
CREATE VIEW EventsDeduped
AS
(
SELECT event_id, book_id, source, happenedAt,
sourceRawData
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY book_id, source, ORDER BY
happened_at ASC) AS row_num
FROM Events)
WHERE row_num = 1
)
)
tableEnv.toRetractStream[Row](tableEnv.sqlQuery("SELECT * FROM
EventsDeduped").print()
12> (true,11e5dc326-f,161111,source_1,2021-11-19T01:39:11)
12> (true,12e5dc326f,171111,source_1,2021-11-19T01:39:11)
10> (true,18e5dc326,171111,source_2,2021-11-29T01:39:11)
CREATE VIEW EventsDedupedSource1
AS
(
SELECT *
FROM EventsDeduped
WHERE source='source_1'
)
CREATE VIEW EventsDedupedSource2
AS
(
SELECT *
FROM EventsDeduped
WHERE source='source_2'
)
-- Step 2: Merge events coming from different sources
CREATE VIEW EventsDedupedMerged
AS
(
SELECT
COALESCE(source_2.event_id, source_1.event_id) AS event_id,
COALESCE(source_2.book_id, source_1.book_id) AS book_id,
COALESCE(source_2.source, source_1.source) AS source,
COALESCE(source_2.happened_at, source_1.happened_at) AS
happened_at,
CASE WHEN
source_2.book_id IS NOT NULL AND
source_1.book_id IS NOT NULL THEN True Else False END AS
merged
FROM
EventsDedupedSource1 source_1
FULL OUTER JOIN
EventsDedupedSource2 source_2
ON source_1.book_id = source_2.book_id AND ABS(TIMESTAMPDIFF(DAY,
source_1.happenedAt, source_2.happenedAt)) < 30
)
tableEnv.toRetractStream[Row](tableEnv.sqlQuery("SELECT * FROM EventsDedupedMerged").print()
<action>,event_id, book_id, source, happened_at, merged
7> (true,11e5dc326,161111,source_1,2021-11-19T01:39:11,false)
5> (true,12e5dc326,171111,source_1,2021-11-19T01:39:11,false)
5> (false,12e5dc326,171111,source_1,2021-11-19T01:39:11,false)
5> (true,18e5dc326,171111,source_2,2021-11-29T01:39:11,true)
5> (true,19e5dc326,171111,source_2,2022-11-29T01:39:11,false)
5> (true,20e5dc326,171111,source_2,2022-11-29T01:39:11,false)
5> (true,21e5dc326,171111,source_2,2021-11-30T01:39:11,true)
在此管道的末尾,预计只有18e5dc326事件合并字段 true,因为从第一步开始,id为 21e5dc326的事件应该已被删除。
但是,第二步仍然以某种方式将其与另一个事件合并,因此在其合并字段中具有 true 。