我有两个 Flink 动态表Event和Configuration.
Event有结构:[id, myTimestamp]并且Configuration有结构:id, myValue, myTimestamp
我正在尝试执行返回的 Flink SQL 查询Event.id, Configuration.myValue,或者Event.id, null如果该Event行与任何fromid不匹配。idConfiguration
预期行为示例(Event开始为Configuration空):
该示例必须阅读为:
[DATA_RECEIVED] => TARGET_TABLE : EXPECTED_OUTPUT
由于 SQL Query 是由连接制成的,因此它被插入到一个UpsertSink(输出的第一个值对应于 upsert 布尔值)
[myId-1, 10] => EventTable : [(true, myId-1, null)]
[myId-1, myValue-A, 15] => ConfigurationTable : [(false, myId-1, null), (true, myId-1, myValue-A)]
[myId-1, myValue-A, 20] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, myValue-A)]
[myId-1, myValue-B, 25] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, myValue-B)]
[myId-1, 30] => EventTable : [(false, myId-1, null), (true, myId-1, myValue-B)]
所以我做了这个查询:
SELECT
Event.id,
Configuration.myValue
FROM
(SELECT id, MAX(myTimestamp) as myTimestamp FROM Event GROUP BY id) as Event
LEFT JOIN
(SELECT id, LATEST_VAL(myValue, myTimestamp) as myValue, MAX(myTimestamp) as myTimestamp FROM Configuration GROUP BY id, myValue) as Configuration
ON Event.id = Configuration.id
GROUP BY Event.id, Configuration.myValue
哪里LATEST_VAL是返回myValue关联的 UDF MAX(myTimestamp)。
但我有我不理解的行为。以下是观察到的结果:
[myId-1, 10] => EventTable : [(true, myId-1, null)] // OK
[myId-1, myValue-A, 15] => ConfigurationTable : [(false, myId-1, null), (true, myId-1, myValue-A)] // OK
[myId-1, myValue-A, 20] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, null), (false, myId-1, null), (true, myId-1, myValue-A)] // NOT OK
[myId-1, myValue-B, 25] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, null), (false, myId-1, null), (true, myId-1, myValue-B)] // NOT OK
[myId-1, 30] => EventTable : [(false, myId-1, null), (true, myId-1, myValue-B)] // OK
您如何解释预期行为和观察到的行为之间的区别?为什么会有额外的输出(true, myId-1, null), (false, myId-1, null)?
是否可以调整 SQL 查询以获得想要的行为?
笔记 :
- 我正在使用 Flink 1.8