0

我有一个经常更新的 MySql 表。我想为过去 20 秒内更新的每个 id 拍摄快照并将值写入 redis。我使用 binlog 作为流输入,并将数据流转换为 Flink 表。我运行以下 sql。

SELECT id, ts, val
FROM my_tbl
WHERE (id, ts) IN
(
   SELECT id, MAX(ts)
   FROM my_tbl
   GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id
)

我知道表连接会使状态大小过大,我将 StreamQueryConfig 设置如下

qConfig.withIdleStateRetentionTime(Time.seconds(600), Time.seconds(1200));

我运行了一天的任务并得到了内存不足的错误。我怎么解决这个问题?

4

1 回答 1

1

您也可以使用时间窗口连接来解决此问题,而不是使用配置了空闲状态保留时间的常规连接。

以下查询应该可以解决问题。

SELECT id, ts, val
FROM my_tbl m1,
     (SELECT id, MAX(ts), TUMBLE_PROCTIME(proctime, INTERVAL '20' SECOND) as ptime
      FROM my_tbl
      GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id) m2
WHERE m1.id = m2.id AND m1.ts = m2.ts ANS
      m1.proctime BETWEEN m2.ptime - INTERVAL '25' SECOND AND m2.ptime

窗口连接谓词 ( BETWEEN) 确保自动清理状态。由于您使用的处理时间不准确,因此我添加了 5 秒的松弛时间。

于 2018-10-10T19:13:03.313 回答