1

我正在尝试以批处理方式使用 Spring Integration 处理一系列文件。我有这个非常旧的 xml,它试图将消息转换为作业

    <int:transformer ref="messageToJobTransformer"/>
    <batch-int:job-launching-gateway job-launcher="jobLauncher"/>

messageToJobTransformer是一个可以将 Message 转换为 Job 的类。问题是我不知道这个文件现在在哪里,我也不想要一个 xml 配置。我希望它是纯 Java DSL。这是我的简单配置。

return IntegrationFlows.from(Files.inboundAdapter(directory)
                    .preventDuplicates()
                    .patternFilter("*.txt")))
                    .handle(jobLaunchingGw())                       
                    .get();

这是我的网关 bean。

@Autowired
private JobLauncher jobLauncher;

@Bean
public MessageHandler jobLaunchingGw() {
    return new JobLaunchingGateway(jobLauncher);   
}

编辑:更新批处理配置类。

@Configuration
    @EnableBatchProcessing
public class BatchConfig

{

@Autowired
private JobBuilderFactory jobs;

@Autowired
private StepBuilderFactory steps;




@Bean
        public ItemReader<String> reader(@Value({jobParameters['input.file.name']}") String filename) throws MalformedURLException
    {
        FlatFileItemReader<String> reader = new FlatFileItemReader<String>();       
        return reader;
    }
@Bean
public Job job() throws MalformedURLException
{
    return jobs.get("job").start(step()).build();
}

@Bean
public Step step() throws MalformedURLException
{
    return steps.get("step").<String, String> chunk(5).reader(reader())
    .writer(writer()).build();
}

@Bean
public ItemWriter<String> writer(@Value("#{jobParameters['input.file.name']}")
{
    FlatFileItemWriter writer = new FlatFileItemWriter();
    return writer;
}



}
4

1 回答 1

2

你的问题不清楚。JobLaunchingGateway期望JobLaunchRequest作为payload. _

由于您的集成流程从 开始Files.inboundAdapter(directory),我可以假设您有一些工作定义。所以,你需要的是一些可以解析文件并返回的类JobLaunchRequest

Spring Batch参考手册中的类似内容:

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

在将该类定义为 a 之后,@Bean您可以.transform().handle(jobLaunchingGw()).

更新

@Bean
public FileMessageToJobRequest fileMessageToJobRequest(Job job) {
     FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
     fileMessageToJobRequest.setJob(job);
     fileMessageToJobRequest.setfileParameterName("file");
     return fileMessageToJobRequest;
}
...

@Bean
public IntegrationFlow flowToBatch(FileMessageToJobRequest fileMessageToJobRequest) {

      return IntegrationFlows
            .from(Files.inboundAdapter(directory)
                    .preventDuplicates()
                    .patternFilter("*.txt")))
            .transform(fileMessageToJobRequest)
            .handle(jobLaunchingGw())                       
                    .get();

}
于 2016-03-16T18:36:08.003 回答