我在我的项目中使用融合 Kafka,其中发送到特定主题的消息需要在保留时间后删除。所以我为各个主题设置了retention.ms,但它不起作用(在保留时间之后我仍然可以看到消息)
我浏览了大多数堆栈问题,但仍然无法找到 Kafka Retention.ms 不工作问题的正确原因/解决方案。
我按照以下步骤以毫秒为单位创建和设置保留时间。
创建了一个话题说。'用户状态'
按照下面的代码更新了它的retention.ms时间
from confluent_kafka.admin import AdminClient, ConfigResource, NewTopic, NewPartitions from confluent_kafka import Producer, Consumer, KafkaError, KafkaException topic_config = ConfigResource('topic', 'user_status') topic_config.set_config('retention.ms', '5000') admin.alter_configs([topic_config])
从生产者端发送消息。
等待了 6000 毫秒的时间。
试图接收来自特定主题的消息。但是我收到了消息,并且消息仍然没有被保留策略删除。
注意: 我确保在更新retention.ms之后,我验证了在kafka主题信息(主题描述)中已经更新了相同的内容。
此外,我使用 log.retention.check.interval.ms=1 ms 更新了 server.properties,并在更新属性文件后重新启动了 Kafka 服务。
我对上述问题的期望
我想将retention.ms 设置为单个主题,并且应该按照Kafka策略中的定义自动删除传递该时间的消息。
我当前的代码现在发生了什么。 即使在保留时间之后,消费者仍会收到消息。