6

日志压缩主题不应该针对同一个键保留重复项。但在我们的例子中,当发送具有相同键的新值时,不会删除前一个值。可能是什么问题?

val TestCompactState: KTable[String, TestCompact] = builder.table[String, TestCompact](kafkaStreamConfigs.getString("testcompact-source"),
   (TestCompactmaterialized).withKeySerde(stringSerde).withValueSerde(TestCompactSerde)) 

我得到的 实际结果

Offsets      Keys        Messages
5            {"id":5}   {"id":5,"namee":"omer","__deleted":"false"}
6            {"id":5}   {"id":5,"namee":"d","__deleted":"false"}

我只想要针对相同键预期结果的最新记录

6            {"id":5}   {"id":5,"namee":"d","__deleted":"false"}
4

3 回答 3

6

这种行为可能有多种原因。压缩清理策略不会在每条传入消息之后运行。相反,有代理配置

log.cleaner.min.compaction.lag.ms:消息在日志中保持未压缩的最短时间。仅适用于正在压缩的日志。

类型:长;默认值:0;有效值:; 更新模式:集群范围

这默认为0所以这可能不是原因但值得检查。

需要注意的是,该compact策略从不压缩当前段。消息仅适用于非活动段上的压缩。确保验证

log.segment.bytes:单个日志文件的最大大小

类型:int;默认:1073741824;有效值:[14,...];更新模式:集群范围

压缩通常由日志当前(“脏”)段中的数据触发。术语“脏”来自未清洁/未压缩。还有另一种配置可以帮助控制压实。

log.cleaner.min.cleanable.ratio:脏日志与总日志的最小比率,以便有资格进行清理。如果还指定了 log.cleaner.max.compaction.lag.ms 或 log.cleaner.min.compaction.lag.ms 配置,则日志压缩器会认为日志符合压缩条件,只要:(i)已达到脏比率阈值,并且日志至少有 log.cleaner.min.compaction.lag.ms 持续时间的脏(未压缩)记录,或者(ii)如果日志最多有脏(未压缩)记录log.cleaner.max.compaction.lag.ms 周期。

类型:双;默认值:0.5;有效值:;更新模式:集群范围

默认情况下,要压缩的消息的删除延迟非常高,如下面的配置描述所示。

log.cleaner.max.compaction.lag.ms:消息在日志中保持不符合压缩条件的最长时间。仅适用于正在压缩的日志。

类型:长;默认:9223372036854775807;有效值:; 更新模式:集群范围

总而言之,您观察所描述的内容可能有几个原因。并且非常重要的是要知道压缩主题不提供任何保证相同键具有重复消息。它只能保证“至少”保留相同密钥的最新消息。

有一个不错的博客更详细地解释了日志压缩。

于 2020-04-10T14:47:11.100 回答
5

据我所知,不可能应用日志压缩策略来为每个密钥保留一条消息即使您设置cleanup.policy=compact(主题级别)或log.cleanup.policy=compact(全局级别),也不能保证只保留最新消息而压缩旧消息。

根据Kafka 官方文档

日志压缩为我们提供了更细粒度的保留机制,从而保证我们至少保留每个主键的最后一次更新

于 2020-04-10T13:41:46.033 回答
0

分区的活动段永远不会被压缩,因此在开始删除旧的重复项之前,可能需要一些时间和更多的消息被发送到主题。

于 2021-05-25T21:59:46.220 回答