2

我们有一个 Kafka 流聚合拓扑。我们需要控制 changeLog 主题的大小以降低 Kafka 存储成本。因此,我们在拓扑中使用转换器(DSL API)来安排一个标点,使用 keyValueStore.delete() 从 stateStore 中删除旧记录。
我能够验证删除后,在标点符号的进一步预定触发器上,状态存储中不存在已删除的密钥。但它是否也会从 changeLog 主题中删除记录?更重要的是,它是否也减少了 changeLog 主题的大小,从而控制了 Kafka 存储成本?

4

2 回答 2

1

是的,对状态存储的更改将应用​​于更改日志主题。

于 2020-08-28T22:11:08.637 回答
1

changelog不,当您发出“删除”命令时,主题中没有实际的记录删除。请注意,“删除”命令实际上是具有写入主题(或任何其他)的null值(aka )的记录 - 请参见此处tombstonechangelog

空值以特殊方式解释:具有空值的记录表示记录键的“DELETE”或墓碑

所以,事实上,解释是让它感觉像是删减的解释;可以将changelog主题(您必须知道确切的主题名称)作为 KStream 或使用 Kafka Consumer API 读取,并会在tombstone那里找到记录(直到被压缩或保留线程删除)。但是,如果您changelog使用 KTable 读取一个或任何压缩主题,那么一条tombstone记录将确定从关联存储中删除 - 尽管事实上它实际上存在于相关压缩主题中,但您将不再在存储中找到相关键。

如果在主题上启用了压缩策略(默认情况下在changelog主题上启用),则其记录将被删除,直到特定键的最后一个。因此,在某些时候,您将只有删除记录,因为之前具有相同键的记录已被压缩 Kafka 线程删除。

于 2020-08-31T19:01:54.270 回答