我只是对流聚合函数中状态存储主题的保留时间(retention.ms)的计算感到困惑。
这是构建拓扑的流配置:
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId1");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, String.valueOf(5));
props.put(StreamsConfig.topicPrefix(TopicConfig.RETENTION_MS_CONFIG), 200);
附加保留设置为5
主题保留设置为200
假设我有这个用于创建拓扑的代码:(为了简单起见,我没有在聚合函数中做任何事情,我只想说明要构建的存储主题)
没有窗口
//topology1
final StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> source = streamsBuilder.stream("test-topic");
source.groupByKey().aggregate(() -> "1", (key, value, aggregate) -> "2");
Topology1 将创建状态存储主题,retention.ms=200
这里保留为 200(默认主题保留)
开窗:
//topology2
final StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> source = streamsBuilder.stream("test-topic");
source.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(10)).grace(Duration.ofMillis(1)))
.aggregate(() -> "1", (key, value, aggregate) -> "2");
Topology2 将创建状态存储主题,retention.ms=86400005
这里保留是 86400000 (???) + 5(额外保留)
通过设置保留窗口:
//topology3
final StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> source = streamsBuilder.stream("test-topic");
source.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(10)).grace(Duration.ofMillis(1)))
.aggregate(() -> "1", (key, value, aggregate) -> "2"
, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("agg")
.withRetention(Duration.ofMillis(300)));
Topology3 将创建状态存储主题,retention.ms=305
这里保留是 300(显式保留集)+ 5(附加保留)
要运行应用程序,还需要此代码:
Topology topology = streamsBuilder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
那么,在拓扑状态存储主题中设置主题保留的逻辑是什么?
为什么在 Topology1 中不使用额外的保留?
为什么在 Topology2 中,不使用默认主题保留?
在 Topology2 中,86400000 与附加保留相加是什么参数?
最后,附加保留不在 Topology1 中使用,但在明确设置保留的 Topology3 中使用。
谁能解释这些背后的逻辑?