我是 Spring Batch 的新手,并实施了一个 Spring Batch 作业,它必须从 DB 中提取大量数据集并写入文件。以下是对我来说按预期工作的示例作业配置。
@Bean
public Job customDBReaderFileWriterJob() throws Exception {
return jobBuilderFactory.get(MY_JOB)
.incrementer(new RunIdIncrementer())
.flow(partitionGenerationStep())
.next(cleanupStep())
.end()
.build();
}
@Bean
public Step partitionGenerationStep() throws Exception {
return stepBuilderFactory
.get("partitionGenerationStep")
.partitioner("Partitioner", partitioner())
.step(multiOperationStep())
.gridSize(50)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step multiOperationStep() throws Exception {
return stepBuilderFactory
.get("MultiOperationStep")
.<Input, Output>chunk(100)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
@StepScope
public DBPartitioner partitioner() {
DBPartitioner dbPartitioner = new DBPartitioner();
dbPartitioner.setColumn(ID);
dbPartitioner.setDataSource(dataSource);
dbPartitioner.setTable(TABLE);
return dbPartitioner;
}
@Bean
@StepScope
public Reader reader() {
return new Reader();
}
@Bean
@StepScope
public Processor processor() {
return new Processor();
}
@Bean
@StepScope
public Writer writer() {
return new Writer();
}
@Bean
public Step cleanupStep() {
return stepBuilderFactory.get("cleanupStep")
.tasklet(cleanupTasklet())
.build();
}
@Bean
@StepScope
public CleanupTasklet cleanupTasklet() {
return new CleanupTasklet();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(10);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MultiThreaded-");
return executor;
}
由于数据集很大,我已将任务执行器的线程池值配置为 10,网格大小为 50。通过此设置,10 个线程一次写入 10 个文件,读取器以块的形式读取文件,因此读取器处理器和写入器流程正在迭代多次(对于一组 10,在移动到下一个分区之前)。
现在,我想添加一个tasklet,一旦一个线程的所有迭代(读取、处理、写入)完成,即在每个分区完成后,我可以在其中压缩文件。
我确实有一个清理 tasklet 最后运行,但是有压缩逻辑意味着首先获取从每个分区生成的所有文件,然后执行压缩。请建议。