0

在我的项目中使用 Flink Table SQL 时,我发现如果GROUP BY我的 SQL 中有任何子句,检查点的大小会大大增加。

例如,

INSERT INTO COMPANY_POST_DAY
SELECT
    sta_date,
    company_id,
    company_name
FROM
    FCBOX_POST_COUNT_VIEW

检查点大小将小于 500KB。

但是这样使用的时候,

INSERT INTO COMPANY_POST_DAY
SELECT
    sta_date,
    company_id,
    company_name,
    sum(ed_post_count)
FROM
    FCBOX_POST_COUNT_VIEW
GROUP BY
    sta_date, company_id, company_name, TUMBLE(procTime, INTERVAL '1' SECOND)

即使没有处理任何消息,检查点大小也会超​​过 70MB。像这样,

图片在这里。

但是当使用 DataStream API 而keyBy不是 Table SQLGROUP BY时,检查点的大小是正常的,小于 1MB。

为什么?

--------更新于2019-03-25--------

在做了一些测试和阅读源码后,我们发现这是 RocksDB 的原因。

当使用 RockDB 作为 state backend 时,checkpoint 的大小会超过每个 key 5MB 左右,而当使用 filesystem 作为 state backend 时,checkpoint 的大小会下降到每个 key 不到 100KB。

为什么 RocksDB 需要这么大的空间来保存状态?我们什么时候应该选择 RocksDB?

4

1 回答 1

0

首先,我不会将 70 MB 视为巨大的状态。有许多具有多个 TB 状态的 Flink 作业。关于为什么两个查询的状态大小不同的问题:

第一个查询是一个简单的投影查询,这意味着可以独立处理每条记录。因此,查询不需要“记住”任何记录,而只需要恢复的流偏移量。

第二个查询执行窗口聚合,需要记住每个窗口的中间结果(部分和),直到时间进展到足以使结果成为最终结果并且可以发出。

由于 Flink SQL 查询被翻译成 DataStream 操作符,SQL 查询和使用keyBy().window(). 两者都运行几乎相同的代码。

更新:已确定状态增加的原因是由 RocksDBStateBackend 引起的。此开销不是每个键的开销,而是每个有状态运算符的开销。由于 RocksDBStateBackend 旨在保存多个 GB 到 TB 的状态大小,因此几 MB 的开销可以忽略不计。

于 2019-03-19T08:26:27.413 回答