我正在使用 apache kafka 生成和使用大小为 5GB 的文件。我想知道是否有一种方法可以在使用主题后自动删除来自主题的消息。我有什么方法可以跟踪消费的消息吗?我不想手动删除它。
5 回答
在 Kafka 中,消费的责任就是消费者的责任,这也是 Kafka 具有如此出色的横向扩展能力的主要原因之一。
使用高级消费者 API 将通过在 Zookeeper 中提交消耗的偏移量来自动为您执行此操作(或者最近的配置选项被特殊的 Kafka 主题用于跟踪消耗的消息)。
简单的消费者 API 让您自己处理如何以及在何处跟踪消费的消息。
Kafka 中的消息清除是通过指定主题的保留时间或为其定义磁盘配额自动完成的,因此对于一个 5GB 文件的情况,该文件将在您定义的保留期过后被删除,无论是否已被消耗。
您不能在消费时删除 Kafka 消息
Kafka 没有在消息被消费时直接删除消息的机制。
我在尝试这样做时发现的最接近的事情是这个技巧,但它未经测试,并且根据设计它不适用于最新消息:
这样做的一个潜在技巧是使用 (a) 压缩主题和 (b) 自定义分区器 (c) 一对拦截器的组合。
该过程将遵循:
- 在写入之前,使用生产者拦截器将 GUID 添加到密钥的末尾。
- 使用自定义分区程序忽略 GUID 以进行分区
- 使用压缩主题,然后您可以通过 producer.send(key+GUID, null) 删除所需的任何单个消息
- 使用消费者拦截器在读取时删除 GUID。
但是您不应该需要此功能。
有 1 个或多个消费者,并且希望一条消息总共只被他们消费一次?
将它们放在同一个消费者组中。
想要避免过多的消息填满磁盘吗?
根据磁盘空间和/或时间设置保留。
根据我的知识,您可以通过减少存储时间从日志中删除消耗的数据。日志的默认时间设置为168 小时,然后数据会自动从您创建的Kafka-Topic中删除。所以,我的建议是减少对server.properties
位于配置文件夹中的 go 的访问,并将168更改为最短时间。因此,在您为log.retention.hours设置的特定时间之后,它们就没有数据了。因此,您的问题将得到解决。
log.retention.hours=168
继续编码
您可以使用 consumer_group :Kafka 保证消息只能被组中的单个消费者读取。https://www.tutorialspoint.com/apache_kafka/apache_kafka_consumer_group_example.htm
我刚刚在这个问题上运行并构建了一个可以定期运行的脚本,以将使用的记录“标记”为已删除。Kafka 不会立即释放空间,但会删除偏移量在“活动”之外的分区。
https://gist.github.com/ThePsyjo/b717d2eaca2deb09b8130b3e917758f6