我试图将Kafka偏移量保存到我使用Spring引导的文件中,似乎偏移量正在写入文件但没有读取,所以骆驼将在重启时从kafka主题的开头开始读取
@Component
public class Route extends RouteBuilder {
@Override
public void configure() throws Exception {
from(kafka())
.to("log:TEST?level=INFO")
.process(Route::commitKafka);
}
private String kafka() {
String kafkaEndpoint = "kafka:";
kafkaEndpoint += "topic";
kafkaEndpoint += "?brokers=";
kafkaEndpoint += "localhost:9092";
kafkaEndpoint += "&groupId=";
kafkaEndpoint += "TEST";
kafkaEndpoint += "&autoOffsetReset=";
kafkaEndpoint += "earliest";
kafkaEndpoint += "&autoCommitEnable=";
kafkaEndpoint += false;
kafkaEndpoint += "&allowManualCommit=";
kafkaEndpoint += true;
kafkaEndpoint += "&offsetRepository=";
kafkaEndpoint += "#fileStore";
return kafkaEndpoint;
}
@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
// This will be empty
// System.out.println(fileStateRepository.getCache());
return fileStateRepository;
}
private static void commitKafka(Exchange exchange) {
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
}
}