我是 Flink 的新手,我正在尝试使用它来获取我的应用程序的大量实时视图。我想构建的动态视图中至少有一个是显示未满足 SLA 或实质上已过期的条目,其条件是简单的时间戳比较。因此,如果最近没有被事件触及,我基本上希望在我的动态表中显示一个条目。在开发环境中使用 Flink 1.6(受 AWS Kinesis 限制)时,我没有看到 Flink 正在重新评估条件,除非事件触及该条目。
我已将我的开发环境插入到从 Web 服务器发送实时访问日志事件的 Kinesis 流中。这不是我真正的用例,但它很容易开始测试。我编写了一个简单的表查询,它获取请求路径、它的最后访问时间,并计算一个布尔标志来指示它是否在最后一分钟内未被访问。我正在通过连接到 PrintSinkFunction 的收回流对此进行调试,因此所有更新/删除都将打印到我的控制台。
tEnv.registerDataStream("AccessLogs", accessLogs, "username, status, request, responseSize, referrer, userAgent, requestTime, ActionTime.rowtime");
Table paths = tEnv.sqlQuery("SELECT request AS path, MAX(requestTime) as lastTime, CASE WHEN MAX(requestTime) < CURRENT_TIMESTAMP - INTERVAL '1' MINUTE THEN 1 ELSE 0 END AS expired FROM AccessLogs GROUP BY request");
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(paths, Row.class);
retractStream .addSink(new PrintSinkFunction<>());
我希望当我访问一个页面时,一个 Add 事件会发送到这个流。然后,如果我等待 1 分钟(什么都不做),我表中的 CASE 语句将评估为 1,所以我应该看到设置了该标志的 Delete 然后 Add 事件。
我实际看到的是,在我再次加载该页面之前什么都没有发生。Delete 事件实际上设置了标志,而紧随其后的 Add 事件再次清除了它(因为它不再“过期)。
// add/delete, path, lastAccess, expired
(true,/mypage,2019-05-20 20:02:48.0,0) // first page load, add event
(false,/mypage,2019-05-20 20:02:48.0,1) // second load > 2 mins later, remove event for the entry with expired flag set
(true,/mypage,2019-05-20 20:05:01.0,0) // second load, add event
编辑:我在搜索中遇到的最有用的提示是创建一个ProcessFunction。我认为这是我可以使用我的动态表进行的工作(在某些情况下,我最终会使用中间流来查看计算日期),但希望不必如此。
我已经使用 ProcessFunction 方法,但它需要比我最初认为的更多的修补:
- 我必须在我的 POJO 中添加一个字段,该字段会在 onTimer() 方法中发生变化(可能是您每次只需更改的日期或版本)
- 我必须将此字段注册为动态表的一部分
- 我必须在查询中使用此字段,以便重新评估查询并更改布尔标志(即使我实际上并未使用新字段)。我只是将它添加为我的 SELECT 子句的一部分。