1

我正在尝试从可能出现一个或多个文件的系统文件夹中轮询文件,对于这些文件,我必须只触发一次批处理作业,而不是次数等于文件夹中的文件数。在我的情况下,我的批处理一次处理多个文件,我只希望轮询器将信号发送到批处理一次以开始其工作。

尝试了 poller.maxMessagesPerPoll(1) 等,但它有所不同。我面临的问题是,批处理作业被触发等于轮询文件夹中的文件数。我只想执行一次批处理

@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setJob(fileMessageBatchJob);
    return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
    return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    System.out.println("&&&&&&&&&&&&&&&&&&Inside Integration Flow!!!!");
    return IntegrationFlows
            .from(Files.inboundAdapter(new File("C:\\apps_data\\recv")),
                    c -> c.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1)))
            .filter(onlyT4F2())
            .handle(fileMessageToJobRequest)
            .handle(jobLaunchingGateway)
            .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").get();
}

@Bean
public GenericSelector<File> onlyT4F2() {
    System.out.println("@@@@@@@Inside GenericSelector of XXX");
    return new GenericSelector<File>() {
        @Override
        public boolean accept(File source) {
          return source.getName().contains("XXX");
        }
    };
}

当前行为 -当轮询器检测到给定位置的文件/文件时,配置的批处理作业会触发多次。如果文件为 4,则批处理作业触发 4 次。

预期行为 -在文件轮询之后,批处理作业应该只对任意数量的文件执行一次。因为批处理作业一次处理多个文件,因此不需要多次执行。

让我知道您是否需要我提供的任何其他信息。请优先提供帮助

4

2 回答 2

0

@Gary Russell - 问题已解决,仅使用 GenericSelector 如下。谢谢你的帮助。在第一次运行时触发批处理作业后,它会处理所有当前文件并将其移动到其他文件夹,因此我添加了 file.exists() 并且它按照我的期望运行良好。但我观察到,在 1 小时后或有时即使在提供了预期的文件后轮询也没有发生,同样需要您的帮助/建议。

@Bean
public GenericSelector<File> triggerJobOnlyOnce() {
    return new GenericSelector<File>() {
        @Override
        public boolean accept(File source) {
                if(source.getName().contains("XXX") && source.exists())
                    return true;
            return flag;
        }
    };
}
于 2019-07-16T07:13:01.713 回答
0

您可以FileListFilter在只返回一个文件的入站通道适配器上使用自定义。

.filter(myFilterThatOnlyReturnsOneFile)

编辑

public class OnlyOneFileListFilter implements FileListFilter<File> {

    @Override
    public List<File> filterFiles(File[] files) {
        return Collections.singletonList(files[0]);
    }

}
        return IntegrationFlows
                .from(Files.inboundAdapter(new File("C:\\apps_data\\recv"))
                            .filter(new OnlyOneFileListFilter()),
                        c -> c.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1)))
                ...
于 2019-07-15T15:07:02.887 回答