0

我对 Spring Batch 很陌生,并尝试使用单线程运行 Spring Batch。现在我需要逐步添加多线程并具有以下配置,但是并行处理在一段时间后挂起,并且在处理一些记录后控制台上没有任何跟踪。之前对于单线程,我使用 JdbcCursorItemReader,然后切换到 JdbcPagingItemReader 用于线程安全读取器。Reader 正在从 postgres DB 中读取条目,然后处理器(调用其他 rest web 服务并将响应返回给 writer)和 writer(在 DB 中创建新文件并更新状态数据)可以并行执行。

    @Bean
  public Job job(JobBuilderFactory jobBuilderFactory,
      StepBuilderFactory stepBuilderFactory,
      ItemReader<OrderRequest> itemReader,
      ItemProcessor<OrderRequest, OrderResponse> dataProcessor,
      ItemWriter<OrderResponse> fileWriter, JobExecutionListener jobListener,
      ItemReadListener<OrderRequest> stepItemReadListener,
      SkipListener<OrderRequest, OrderResponse> stepSkipListener, TaskExecutor taskExecutor) {


    Step step1 = stepBuilderFactory.get("Process-Data")
        .<OrderRequest, OrderResponse>chunk(10)
        .listener(stepItemReadListener)
        .reader(itemReader)
        .processor(dataProcessor)
        .writer(fileWriter)
        .faultTolerant()
        .processorNonTransactional()
        .skipLimit(5)
        .skip(CustomException.class)
        .listener(stepSkipListener)
        .taskExecutor(taskExecutor)
        .throttleLimit(5)
        .build();

    return jobBuilderFactory.get("Batch-Job")
        .incrementer(new RunIdIncrementer())
        .listener(jobListener)
        .start(step1)
        .build();
  }
  
  @StepScope
  @Bean
  public JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader(@Qualifier("postgresDataSource") DataSource dataSource,
      @Value("#{jobParameters[customerId]}") String customerId, OrderRequestRowMapper rowMapper) {

    // reading database records using JDBC in a paging fashion

    JdbcPagingItemReader<OrderRequest> reader = new JdbcPagingItemReader<>();
    reader.setDataSource(dataSource);
    reader.setFetchSize(1000);
    reader.setRowMapper(rowMapper);

    // Sort Keys
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("OrderRequestID", Order.ASCENDING);

    // Postgres implementation of a PagingQueryProvider using database specific features.

    PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
    queryProvider.setSelectClause("*");
    queryProvider.setFromClause("FROM OrderRequest");
    queryProvider.setWhereClause("CUSTOMER = '" + customerId + "'");
    queryProvider.setSortKeys(sortKeys);
    reader.setQueryProvider(queryProvider);
    return reader;
  }
 
 @StepScope
  @Bean
  public SynchronizedItemStreamReader<OrderRequest> itemReader(JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader) {
    return new SynchronizedItemStreamReaderBuilder<OrderRequest>().delegate(jdbcPagingItemReader).build();
  }
  
  @Bean
  public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.setMaxPoolSize(5);
    taskExecutor.setQueueCapacity(0);
    return taskExecutor;
  }

  @StepScope
  @Bean
  ItemProcessor<OrderRequest, OrderResponse> dataProcessor() {
    return new BatchDataFileProcessor();
  }

  @StepScope
  @Bean
  ItemWriter<OrderResponse> fileWriter() {
    return new BatchOrderFileWriter();
  }


  @StepScope
  @Bean
  public ItemReadListener<OrderRequest> stepItemReadListener() {
    return new StepItemReadListener();
  }


  @Bean
  public JobExecutionListener jobListener() {
    return new JobListener();
  }

  @StepScope
  @Bean
  public SkipListener<OrderRequest, OrderResponse> stepSkipListener() {
    return new StepSkipListener();
  }

这里的多线程配置有什么问题?当使用 JdbcCursorItemReader 并且没有 TaskExecutor bean 时,批处理一次可以很好地处理单个记录:

@StepScope
  @Bean
  public JdbcCursorItemReader<OrderRequest> jdbcCursorItemReader(@Qualifier("postgresDataSource") DataSource dataSource,
      @Value("#{jobParameters[customerId]}") String customerId, OrderRequestRowMapper rowMapper) {

    return new JdbcCursorItemReaderBuilder<OrderRequest>()
        .name("jdbcCursorItemReader")
        .dataSource(dataSource)
        .queryArguments(customerId)
        .sql(CommonConstant.FETCH_QUERY)
        .rowMapper(rowMapper)
        .saveState(true)
        .build();
  }
4

1 回答 1

0

按如下方式更改 TaskExecutor 后,它现在正在工作:

@Bean
  public TaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(concurrencyLimit);
    return taskExecutor;
  }

之前没搞清楚是什么问题。

于 2021-07-27T11:25:22.427 回答