我是卡夫卡的新手。我们正在尝试将数据从 csv 文件导入到 Kafka。我们需要每天导入,也就是说前一天的数据已经过时了。如何在 python 中删除 Kafka 主题下的所有消息?或者我怎么能在 python 中删除 Kafka 主题?或者我看到有人建议等待数据过期,如果可以的话,我该如何设置数据过期时间?任何建议将不胜感激!
谢谢
我是卡夫卡的新手。我们正在尝试将数据从 csv 文件导入到 Kafka。我们需要每天导入,也就是说前一天的数据已经过时了。如何在 python 中删除 Kafka 主题下的所有消息?或者我怎么能在 python 中删除 Kafka 主题?或者我看到有人建议等待数据过期,如果可以的话,我该如何设置数据过期时间?任何建议将不胜感激!
谢谢
您不能删除 Kafka 主题中的消息。你可以:
log.retention.*
属性,这基本上是消息的到期。您可以选择基于时间的到期(例如保留六小时或更早的消息)或基于空间的到期(例如保留最大 1 GB 的消息)。请参阅代理配置并搜索保留。您可以为不同的主题设置不同的值。但我认为您根本不需要删除主题中的消息。因为您的 Kafka 消费者会跟踪已处理的消息。因此,当您阅读今天的所有消息时,Kafka 消费者会保存此信息,而您明天将只阅读新消息。
另一种可能的解决方案是Log compaction。但它更复杂,可能不是你需要的。基本上,您可以为 Kafka 主题中的每条消息设置一个密钥。如果您使用相同的密钥发送两条不同的消息,Kafka 将只保留主题中的最新消息,并删除所有具有相同密钥的旧消息。您可以将其视为一种“键值存储”。具有相同键的每条消息只会更新特定键下的值。但是,嘿,你真的不需要这个,这只是 FYI :-)。
最简单的方法是简单地删除主题。我在 Python 自动化测试套件中使用它,我想验证一组特定的测试消息是通过 Kafka 发送的,并且不想看到以前测试运行的结果
def delete_kafka_topic(topic_name):
call(["/usr/bin/kafka-topics", "--zookeeper", "zookeeper-1:2181", "--delete", "--topic", topic_name])