0

我在我的项目中使用融合 Kafka,其中发送到特定主题的消息需要在保留时间后删除。所以我为各个主题设置了retention.ms,但它不起作用(在保留时间之后我仍然可以看到消息)

我浏览了大多数堆栈问题,但仍然无法找到 Kafka Retention.ms 不工作问题的正确原因/解决方案。

我按照以下步骤以毫秒为单位创建和设置保留时间。

  1. 创建了一个话题说。'用户状态'

  2. 按照下面的代码更新了它的retention.ms时间

     from confluent_kafka.admin import AdminClient, ConfigResource, NewTopic, NewPartitions
     from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
    
     topic_config = ConfigResource('topic', 'user_status')
     topic_config.set_config('retention.ms', '5000')
     admin.alter_configs([topic_config])
    
  3. 从生产者端发送消息。

  4. 等待了 6000 毫秒的时间。

  5. 试图接收来自特定主题的消息。但是我收到了消息,并且消息仍然没有被保留策略删除。

注意: 我确保在更新retention.ms之后,我验证了在kafka主题信息(主题描述)中已经更新了相同的内容。

此外,我使用 log.retention.check.interval.ms=1 ms 更新了 server.properties,并在更新属性文件后重新启动了 Kafka 服务。

我对上述问题的期望

我想将retention.ms 设置为单个主题,并且应该按照Kafka策略中的定义自动删除传递该时间的消息。

我当前的代码现在发生了什么。 即使在保留时间之后,消费者仍会收到消息。

4

0 回答 0