2

我正在kafka中进行数据复制。但是,kafka 日志文件的大小增加得非常快。一天的大小达到 5 GB。作为这个问题的解决方案,我想立即删除处理过的数据。我在 AdminClient 中使用删除记录方法来删除偏移量。但是当我查看日志文件时,与该偏移量对应的数据并没有被删除。

RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffset(offset);
TopicPartition topicPartition = new TopicPartition(topicName,partition);
Map<TopicPartition,RecordsToDelete> deleteConf = new HashMap<>();
deleteConf.put(topicPartition,recordsToDelete);
adminClient.deleteRecords(deleteConf);

我不想要像(log.retention.hours , log.retention.bytes , log.segment.bytes , log.cleanup.policy=delete)这样的建议

因为我只想删除消费者消费的数据。在这个解决方案中,我还删除了没有被消费的数据。

你有什么建议?

4

2 回答 2

2

你没有做错什么。您提供的代码有效,我已经对其进行了测试。以防万一我忽略了您的代码中的某些内容,我的是:

public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
    TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
    Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
    deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
    kafkaAdminClient.deleteRecords(deleteMap);
}

我使用了组:'org.apache.kafka',名称:'kafka-clients',版本:'2.0.0'

因此,请检查您是否针对正确的分区(第一个分区为 0)

检查您的代理版本:https ://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html说:

0.11.0.0 版本的代理支持此操作

从同一应用程序生成消息,以确保您已正确连接。

您还可以考虑另一种选择。使用cleanup.policy=compact如果您的消息密钥重复,您可以从中受益。不仅因为该键的旧消息将被自动删除,而且您可以使用具有空负载的消息删除该键的所有消息这一事实。只是不要忘记将delete.retention.msmin.compaction.lag.ms设置为足够小的值。在这种情况下,您可以使用一条消息,而不是为同一密钥生成空有效负载(但请谨慎使用这种方法,因为这样您可以删除未使用的消息(使用该密钥))

于 2018-10-24T16:39:12.340 回答
1

尝试这个

DeleteRecordsResult result = adminClient.deleteRecords(recordsToDelete);
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = result.lowWatermarks();
try {
    for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : lowWatermarks.entrySet()) {
        System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
adminClient.close();

在这段代码中,需要调用entry.getValue().get().lowWatermark(),因为 adminClient.deleteRecords(recordsToDelete) 返回的是 Futures 的映射,所以需要调用 get() 等待 Future 运行

于 2018-12-20T07:24:27.003 回答