我有一个用例,我需要在多个线程中运行读写器处理器,主要是为了提高性能,并且工作一致。
@Bean
public Job batchJob() {
return this.jobBuilderFactory.get("ccmbatchjob")
.start(fileInitializationStep())
.on("FAILED")
.to(summaryFileWriterStep())
.from(fileInitializationStep())
.on("*")
.to(fileMetadataValidationStep())
.from(fileMetadataValidationStep())
.on("FAILED")
.to(summaryFileWriterStep())
.from(fileMetadataValidationStep())
.on("*")
.to(redisInitializationStep())
.from(redisInitializationStep())
.next(convertandPushRecord())
.next(resultMonitoringStep())
.next(resultConsolidatorStep())
.next(summaryFileWriterStep())
.next(zipResultStep())
.next(returnResultStep())
.next(cleanBatchJob()).end()
.build();
}
@Bean
public Step convertandPushRecord() {
return stepBuilders.get("convertandPushRecord")
.<DataInputFileLine, ProducerRecord<String, String>>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(10);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("MultiThreaded-");
return executor;
}
convertandPushRecord() 步骤由“MultipleThreaded-*”线程执行,其中“fileInitializationStep()”或“fileMetadataValidationStep()”等步骤由主线程或 JobLauncher 线程执行,如果配置如下:
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
ThreadPoolTaskExecutor tpe = new ThreadPoolTaskExecutor();
tpe.setMaxPoolSize(1);
tpe.initialize();
jobLauncher.setTaskExecutor(tpe);// decide max pool size ??
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
有时(例如 10 次中的 1 次)我遇到的问题是步骤未由“MultiThreaded-*”线程执行,而不是由 joblauncher 线程或主线程执行,它会导致下游整个工作流出错。
这是春季批次的问题还是我错过了一些构造?