1

我是 Flink 的新手,我正在尝试使用它来获取我的应用程序的大量实时视图。我想构建的动态视图中至少有一个是显示未满足 SLA 或实质上已过期的条目,其条件是简单的时间戳比较。因此,如果最近没有被事件触及,我基本上希望在我的动态表中显示一个条目。在开发环境中使用 Flink 1.6(受 AWS Kinesis 限制)时,我没有看到 Flink 正在重新评估条件,除非事件触及该条目。

我已将我的开发环境插入到从 Web 服务器发送实时访问日志事件的 Kinesis 流中。这不是我真正的用例,但它很容易开始测试。我编写了一个简单的表查询,它获取请求路径、它的最后访问时间,并计算一个布尔标志来指示它是否在最后一分钟内未被访问。我正在通过连接到 PrintSinkFunction 的收回流对此进行调试,因此所有更新/删除都将打印到我的控制台。

tEnv.registerDataStream("AccessLogs", accessLogs, "username, status, request, responseSize, referrer, userAgent, requestTime, ActionTime.rowtime");

Table paths = tEnv.sqlQuery("SELECT request AS path, MAX(requestTime) as lastTime, CASE WHEN MAX(requestTime) < CURRENT_TIMESTAMP - INTERVAL '1' MINUTE THEN 1 ELSE 0 END AS expired FROM AccessLogs GROUP BY request");

DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(paths, Row.class);
retractStream .addSink(new PrintSinkFunction<>());

我希望当我访问一个页面时,一个 Add 事件会发送到这个流。然后,如果我等待 1 分钟(什么都不做),我表中的 CASE 语句将评估为 1,所以我应该看到设置了该标志的 Delete 然后 Add 事件。

我实际看到的是,在我再次加载该页面之前什么都没有发生。Delete 事件实际上设置了标志,而紧随其后的 Add 事件再次清除了它(因为它不再“过期)。

// add/delete, path, lastAccess, expired
(true,/mypage,2019-05-20 20:02:48.0,0) // first page load, add event
(false,/mypage,2019-05-20 20:02:48.0,1) // second load > 2 mins later, remove event for the entry with expired flag set
(true,/mypage,2019-05-20 20:05:01.0,0) // second load, add event

编辑:我在搜索中遇到的最有用的提示是创建一个ProcessFunction。我认为这是我可以使用我的动态表进行的工作(在某些情况下,我最终会使用中间流来查看计算日期),但希望不必如此。

我已经使用 ProcessFunction 方法,但它需要比我最初认为的更多的修补:

  1. 我必须在我的 POJO 中添加一个字段,该字段会在 onTimer() 方法中发生变化(可能是您每次只需更改的日期或版本)
  2. 我必须将此字段注册为动态表的一部分
  3. 我必须在查询中使用此字段,以便重新评估查询并更改布尔标志(即使我实际上并未使用新字段)。我只是将它添加为我的 SELECT 子句的一部分。
4

1 回答 1

1

您的方法看起来很有希望,但 Flink 的 Table API / SQL 不支持与移动的“现在”时间戳进行比较(目前)。

我将分两步解决这个问题。

  1. 在 upsert 模式下注册动态表,即request基于版本时间戳(requestTime在您的情况下)每个键(在您的情况下)更新插入的表。生成的动态表将保存每个请求的最新行。
  2. 使用像您这样的简单过滤谓词进行查询,比较动态(upsert)表行的版本时间戳,并过滤掉所有时间戳太接近现在的行。

不幸的是,Flink 中还没有这两个功能(更新插入转换和与移动的“现在”时间戳进行比较)。不过,还有一些正在进行的 upsert 表转换工作。

于 2019-05-27T09:00:39.580 回答