0

我只是对流聚合函数中状态存储主题的保留时间(retention.ms)的计算感到困惑。

这是构建拓扑的流配置:

    Properties props = new Properties();
    props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId1");
    props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.setProperty(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, String.valueOf(5));

    props.put(StreamsConfig.topicPrefix(TopicConfig.RETENTION_MS_CONFIG), 200);

附加保留设置为5
主题保留设置为200

假设我有这个用于创建拓扑的代码:(为了简单起见,我没有在聚合函数中做任何事情,我只想说明要构建的存储主题)
没有窗口

    //topology1
    final StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> source = streamsBuilder.stream("test-topic");
    source.groupByKey().aggregate(() -> "1",  (key, value, aggregate) -> "2");

Topology1 将创建状态存储主题,retention.ms=200
这里保留为 200(默认主题保留)

开窗:

    //topology2
    final StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> source = streamsBuilder.stream("test-topic");
    source.groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMillis(10)).grace(Duration.ofMillis(1)))
            .aggregate(() -> "1", (key, value, aggregate) -> "2");

Topology2 将创建状态存储主题,retention.ms=86400005
这里保留是 86400000 (???) + 5(额外保留)

通过设置保留窗口:

    //topology3
    final StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> source = streamsBuilder.stream("test-topic");
    source.groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMillis(10)).grace(Duration.ofMillis(1)))
            .aggregate(() -> "1",  (key, value, aggregate) -> "2"
                    , Materialized.<String, String, WindowStore<Bytes, byte[]>>as("agg")
                            .withRetention(Duration.ofMillis(300)));

Topology3 将创建状态存储主题,retention.ms=305
这里保留是 300(显式保留集)+ 5(附加保留)

要运行应用程序,还需要此代码:

    Topology topology = streamsBuilder.build();
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();   

那么,在拓扑状态存储主题中设置主题保留的逻辑是什么?
为什么在 Topology1 中不使用额外的保留?
为什么在 Topology2 中,不使用默认主题保留?
在 Topology2 中,86400000 与附加保留相加是什么参数?
最后,附加保留不在 Topology1 中使用,但在明确设置保留的 Topology3 中使用。

谁能解释这些背后的逻辑?

4

1 回答 1

0

对于窗口化的 KStreams,有一个默认值为 24 小时或 86400000 毫秒的本地保留时间。这就是 Topology2 保留时间背后的原因。

您可以通过 设置本地存储保留时间Materialized.withRetentionTime(...),这就是 Topology3 保留时间为 305ms 的原因

于 2021-04-15T16:50:44.190 回答