我对 Spring Batch 很陌生,并尝试使用单线程运行 Spring Batch。现在我需要逐步添加多线程并具有以下配置,但是并行处理在一段时间后挂起,并且在处理一些记录后控制台上没有任何跟踪。之前对于单线程,我使用 JdbcCursorItemReader,然后切换到 JdbcPagingItemReader 用于线程安全读取器。Reader 正在从 postgres DB 中读取条目,然后处理器(调用其他 rest web 服务并将响应返回给 writer)和 writer(在 DB 中创建新文件并更新状态数据)可以并行执行。
@Bean
public Job job(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory,
ItemReader<OrderRequest> itemReader,
ItemProcessor<OrderRequest, OrderResponse> dataProcessor,
ItemWriter<OrderResponse> fileWriter, JobExecutionListener jobListener,
ItemReadListener<OrderRequest> stepItemReadListener,
SkipListener<OrderRequest, OrderResponse> stepSkipListener, TaskExecutor taskExecutor) {
Step step1 = stepBuilderFactory.get("Process-Data")
.<OrderRequest, OrderResponse>chunk(10)
.listener(stepItemReadListener)
.reader(itemReader)
.processor(dataProcessor)
.writer(fileWriter)
.faultTolerant()
.processorNonTransactional()
.skipLimit(5)
.skip(CustomException.class)
.listener(stepSkipListener)
.taskExecutor(taskExecutor)
.throttleLimit(5)
.build();
return jobBuilderFactory.get("Batch-Job")
.incrementer(new RunIdIncrementer())
.listener(jobListener)
.start(step1)
.build();
}
@StepScope
@Bean
public JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader(@Qualifier("postgresDataSource") DataSource dataSource,
@Value("#{jobParameters[customerId]}") String customerId, OrderRequestRowMapper rowMapper) {
// reading database records using JDBC in a paging fashion
JdbcPagingItemReader<OrderRequest> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(rowMapper);
// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("OrderRequestID", Order.ASCENDING);
// Postgres implementation of a PagingQueryProvider using database specific features.
PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
queryProvider.setSelectClause("*");
queryProvider.setFromClause("FROM OrderRequest");
queryProvider.setWhereClause("CUSTOMER = '" + customerId + "'");
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@StepScope
@Bean
public SynchronizedItemStreamReader<OrderRequest> itemReader(JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader) {
return new SynchronizedItemStreamReaderBuilder<OrderRequest>().delegate(jdbcPagingItemReader).build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(5);
taskExecutor.setQueueCapacity(0);
return taskExecutor;
}
@StepScope
@Bean
ItemProcessor<OrderRequest, OrderResponse> dataProcessor() {
return new BatchDataFileProcessor();
}
@StepScope
@Bean
ItemWriter<OrderResponse> fileWriter() {
return new BatchOrderFileWriter();
}
@StepScope
@Bean
public ItemReadListener<OrderRequest> stepItemReadListener() {
return new StepItemReadListener();
}
@Bean
public JobExecutionListener jobListener() {
return new JobListener();
}
@StepScope
@Bean
public SkipListener<OrderRequest, OrderResponse> stepSkipListener() {
return new StepSkipListener();
}
这里的多线程配置有什么问题?当使用 JdbcCursorItemReader 并且没有 TaskExecutor bean 时,批处理一次可以很好地处理单个记录:
@StepScope
@Bean
public JdbcCursorItemReader<OrderRequest> jdbcCursorItemReader(@Qualifier("postgresDataSource") DataSource dataSource,
@Value("#{jobParameters[customerId]}") String customerId, OrderRequestRowMapper rowMapper) {
return new JdbcCursorItemReaderBuilder<OrderRequest>()
.name("jdbcCursorItemReader")
.dataSource(dataSource)
.queryArguments(customerId)
.sql(CommonConstant.FETCH_QUERY)
.rowMapper(rowMapper)
.saveState(true)
.build();
}