1

如何使用 java dsl Integrationflows 从 spring 集成中触发 spring 批处理作业。

我有下面的代码轮询目录中的文件,当一个新文件添加到目录中时,会生成一条消息,我想在那个实例上触发一个 spring 批处理作业。请指教。

@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                              @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                              TaskExecutor taskExecutor,
                                              MessageSource<File> fileReadingMessageSource) {

    return IntegrationFlows.from(fileReadingMessageSource,
            c -> c.poller(Pollers.fixedDelay(period)
                    .taskExecutor(taskExecutor)
                    .maxMessagesPerPoll(maxMessagesPerPoll)))
              .transform(Transformers.fileToString())
                     .channel(ApplicationConfiguration.INBOUND_CHANNEL)                 

             .get();
}
4

2 回答 2

0

Spring Batch参考手册中有关于此事的干净样本:

@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            handle(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}
于 2018-01-22T19:42:47.603 回答
0

以下是我的代码:-

@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                              @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                              TaskExecutor taskExecutor,
                                              MessageSource<File> fileReadingMessageSource,
                                              JobLaunchingGateway jobLaunchingGateway) {

    return IntegrationFlows.from(fileReadingMessageSource,
            c -> c.poller(Pollers.fixedDelay(period)
                    .taskExecutor(taskExecutor)
                    .maxMessagesPerPoll(maxMessagesPerPoll)))
              .handle(fileMessageToJobRequest(),"toRequest")
                       .handle(jobLaunchingGateway)
                        .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
                             .get();
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
  //  fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
  //  simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}
于 2018-01-23T18:06:02.087 回答