0

我试图将Kafka偏移量保存到我使用Spring引导的文件中,似乎偏移量正在写入文件但没有读取,所以骆驼将在重启时从kafka主题的开头开始读取

@Component
public class Route extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    from(kafka())
            .to("log:TEST?level=INFO")
            .process(Route::commitKafka);
}

private String kafka() {

    String kafkaEndpoint = "kafka:";

    kafkaEndpoint += "topic";
    kafkaEndpoint += "?brokers=";
    kafkaEndpoint += "localhost:9092";
    kafkaEndpoint += "&groupId=";
    kafkaEndpoint += "TEST";
    kafkaEndpoint += "&autoOffsetReset=";
    kafkaEndpoint += "earliest";
    kafkaEndpoint += "&autoCommitEnable=";
    kafkaEndpoint += false;
    kafkaEndpoint += "&allowManualCommit=";
    kafkaEndpoint += true;
    kafkaEndpoint += "&offsetRepository=";
    kafkaEndpoint += "#fileStore";

    return kafkaEndpoint;
}

@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
    FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
     // This will be empty
     // System.out.println(fileStateRepository.getCache());
    return fileStateRepository;
}

private static void commitKafka(Exchange exchange) {
      KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
      manual.commitSync();
  }
}
4

1 回答 1

1

我终于找到了一个解决方案,但它没有出现在文档中,必须调用 start 方法来在启动时初始化 repo

@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
    FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));

    try {
        fileStateRepository.start();
    } catch (Exception e) {
        e.printStackTrace();
    }


    return fileStateRepository;
}
于 2018-08-15T11:27:35.887 回答