2

我正在尝试使用低级处理器 API。我正在使用处理器 API 对传入记录进行数据聚合,并将聚合记录写入 RocksDB。

但是,我想保留添加到 RocksDB 中的记录,使其仅在 24 小时内处于活动状态。24 小时后应删除记录。这可以通过更改 ttl 设置来完成。但是,没有太多文档可以让我获得一些帮助。

如何更改 ttl 值?我应该使用什么 java api 将 ttl 时间设置为 24 小时,以及当前默认的 ttl 设置时间是什么?

4

1 回答 1

3

我相信这目前没有通过 api 或配置公开。RocksDBStore 在打开 RocksDB 时会传递一个硬编码的 TTL: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore。 java#L158 并且硬编码的值只是 TTL_SECONDS = TTL_NOT_USED (-1) (参见同一文件中的第 79 行)。

目前有 2 张关于在州商店中公开 TTL 支持的公开票:KAFKA-4212 和 KAFKA-4273: https ://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND% 20text%20~%20%22rocksdb%20ttl%22 我建议您对其中一个描述您的用例进行评论,以使他们继续前进。

在此期间,如果您现在需要 TTL 功能,状态存储是可插入的,并且 RocksDBStore 源随时可用,因此您可以分叉它并设置您的 TTL 值(或者,就像与 KAFKA-4273 相关的拉取请求建议的那样,源它来自配置)。

我知道这并不理想,并真诚地希望有人提出更令人满意的答案。

于 2017-05-09T06:47:30.493 回答