0

我是 Spring Batch 的新手,并实施了一个 Spring Batch 作业,它必须从 DB 中提取大量数据集并写入文件。以下是对我来说按预期工作的示例作业配置。

@Bean
public Job customDBReaderFileWriterJob() throws Exception {
    return jobBuilderFactory.get(MY_JOB)
            .incrementer(new RunIdIncrementer())
            .flow(partitionGenerationStep())
            .next(cleanupStep())
            .end()
            .build();
}

@Bean
public Step partitionGenerationStep() throws Exception {
    return stepBuilderFactory
            .get("partitionGenerationStep")
            .partitioner("Partitioner", partitioner())
            .step(multiOperationStep())
            .gridSize(50)
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public Step multiOperationStep() throws Exception {
    return stepBuilderFactory
            .get("MultiOperationStep")
            .<Input, Output>chunk(100)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
}

@Bean
@StepScope
public DBPartitioner partitioner() {
    DBPartitioner dbPartitioner = new DBPartitioner();
    dbPartitioner.setColumn(ID);
    dbPartitioner.setDataSource(dataSource);
    dbPartitioner.setTable(TABLE);
    return dbPartitioner;
}

@Bean
@StepScope
public Reader reader() {
    return new Reader();
}

@Bean
@StepScope
public Processor processor() {
    return new Processor();
}

@Bean
@StepScope
public Writer writer() {
    return new Writer();
}    

@Bean
public Step cleanupStep() {
    return stepBuilderFactory.get("cleanupStep")
            .tasklet(cleanupTasklet())
            .build();
}

@Bean
@StepScope
public CleanupTasklet cleanupTasklet() {
    return new CleanupTasklet();
}

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(10);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("MultiThreaded-");
    return executor;
}    

由于数据集很大,我已将任务执行器的线程池值配置为 10,网格大小为 50。通过此设置,10 个线程一次写入 10 个文件,读取器以块的形式读取文件,因此读取器处理器和写入器流程正在迭代多次(对于一组 10,在移动到下一个分区之前)。

现在,我想添加一个tasklet,一旦一个线程的所有迭代(读取、处理、写入)完成,即在每个分区完成后,我可以在其中压缩文件。

我确实有一个清理 tasklet 最后运行,但是有压缩逻辑意味着首先获取从每个分区生成的所有文件,然后执行压缩。请建议。

4

1 回答 1

0

您可以将您的工作步骤更改multiOperationStepFlowStep面向块的步骤,然后是一个简单的小任务步骤,您可以在其中进行压缩。也就是说,worker step其实就是两个step合二为一FlowStep

于 2020-11-16T12:30:10.597 回答