1

我正在使用 Spring 云流读取文件并使用文件拆分器拆分并使用 DSL 样式将每一行作为消息发出,正在读取的文件有一个标题行,只是想知道是否有一种简单的方法可以在 / 之前跳过标题行看完后。

任何帮助表示赞赏。

这是我的拆分器和集成流的样子:

enter code here
  return IntegrationFlows
            .from("....")
            .split(Files.splitter(true, true)/
            .charset(StandardCharsets.UTF_8)
            .applySequence(true), //emmit sequenceNumber to header
             e -> e.id("fileSplitter")
            );


enter code here
    IntegrationFlow integrationFlow = integrationFlowBuilder
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(FileSplitter.FileMarker.class, "markers.input")
                    .channelMapping(String.class, "lines.input"))
            .get();
4

2 回答 2

1

如果我没看错,您正在使用我们的 OOB 应用程序之一,文件来源:https ://github.com/spring-cloud-stream-app-starters/file/blob/master/spring-cloud-starter-stream- source-file/README.adoc并使用 Spring Cloud Dataflow dsl 进行部署,例如stream create file ----file.consumer.mode=lines --file.directory=/tmp/ | sink正确吗?

如果是这样,sequence_number当您以行模式读取文件时,会调用一个特殊的标题。您可以在两者之间添加一个过滤器,以根据标头表达式删除这些消息。

于 2017-08-29T18:21:57.420 回答
0

Spring Integration 5.1.5 解决方案:

@Bean
public MessageSource<File> sourceDirectory() {
    FileReadingMessageSource messageSource = new FileReadingMessageSource();
    messageSource.setDirectory(new File("./data/input"));
    return messageSource;
}

@Bean
public IntegrationFlow folderFlow() {
    FileSplitter fileSplitter = new FileSplitter();
    fileSplitter.setFirstLineAsHeader("columns");
    return IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(1000)))
            .split(fileSplitter)
            .handle(System.out::println)
            .get();
}
于 2019-10-09T09:30:20.237 回答