0

尝试使用 MultiResourceItemReader 在春季批处理中读取多个文件,并且还让 taskExecutor 用于每个文件中的记录以在多线程中读取。假设一个文件夹中有 3 个 csv 文件, MultiResourceItemReader 应该一个一个地执行它,但是因为我有 taskExecutor ,不同的线程占用了 csv 文件,就像两个 csv 文件被同一文件夹中的线程占用并开始执行。

期望:- MultiResourceItemReader 应该读取第一个文件,然后 taskExecutor 应该产生不同的线程并执行。然后应该拾取另一个文件,并由 taskExecutor 执行。

代码片段/Batch_Configuration :- @Bean public Step Step1() { return stepBuilderFactory.get("Step1") .<POJO, POJO>chunk(5) .reader(multiResourceItemReader()) .writer(writer()) .taskExecutor( taskExecutor()).throttleLimit(throttleLimit).build(); }

@Bean
public MultiResourceItemReader<POJO> multiResourceItemReader() 
{
    MultiResourceItemReader<POJO> resourceItemReader = new MultiResourceItemReader<POJO>();
    ClassLoader cl = this.getClass().getClassLoader();
    ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl);

    Resource[] resources;
    try {
        resources = resolver.getResources("file:/temp/*.csv");
         resourceItemReader.setResources(resources);
            resourceItemReader.setDelegate(itemReader());
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return resourceItemReader;
}
4

1 回答 1

0

也许您应该检查 Partitioner ( https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#partitioning ) 并实现如下内容:

@Bean
public Step mainStep(StepBuilderFactory stepBuilderFactory,
                     FlatFileItemReader itemReader,
                     ListDelegateWriter listDelegateWriter,
                     BatchProperties batchProperties) {
    return stepBuilderFactory.get(Steps.MAIN)
            .<POJO, POJO>chunk(pageSize)
            .reader(unmatchedItemReader)
            .writer(listDelegateWriter)
            .build();
}

@Bean
public TaskExecutor jobTaskExecutor(@Value("${batch.config.core-pool-size}") Integer corePoolSize,
                                    @Value("${batch.config.max-pool-size}") Integer maxPoolSize) {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(corePoolSize);
    taskExecutor.setMaxPoolSize(maxPoolSize);
    taskExecutor.afterPropertiesSet();
    return taskExecutor;
}

@Bean
@StepScope
public Partitioner partitioner(@Value("#{jobExecutionContext[ResourcesToRead]}") String[] resourcePaths,
                               @Value("${batch.config.grid-size}") Integer gridSize) {
    Resource[] resourceList = Arrays.stream(resourcePaths)
            .map(FileSystemResource::new)
            .toArray(Resource[]::new);
    MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
    partitioner.setResources(resourceList);
    partitioner.partition(gridSize);
    return partitioner;
}

@Bean
public Step masterStep(StepBuilderFactory stepBuilderFactory, Partitioner partitioner, Step mainStep, TaskExecutor jobTaskExecutor) {
    return stepBuilderFactory.get(BatchConstants.MASTER)
            .partitioner(mainStep)
            .partitioner(Steps.MAIN, partitioner)
            .taskExecutor(jobTaskExecutor)
            .build();
}
于 2020-07-30T19:13:54.730 回答