0

我有一个用例,我需要在多个线程中运行读写器处理器,主要是为了提高性能,并且工作一致。

 @Bean
    public Job batchJob() {
        return this.jobBuilderFactory.get("ccmbatchjob")                        
                        .start(fileInitializationStep())
                        .on("FAILED")
                            .to(summaryFileWriterStep())
                        .from(fileInitializationStep()) 
                        .on("*")
                            .to(fileMetadataValidationStep())
                        .from(fileMetadataValidationStep()) 
                        .on("FAILED")
                            .to(summaryFileWriterStep())
                        .from(fileMetadataValidationStep())     
                        .on("*")
                            .to(redisInitializationStep())
                        .from(redisInitializationStep())
                        .next(convertandPushRecord())
                        .next(resultMonitoringStep())
                        .next(resultConsolidatorStep())
                        .next(summaryFileWriterStep())
                        .next(zipResultStep())
                        .next(returnResultStep())
                        .next(cleanBatchJob()).end()
                        .build();
    }

  @Bean
    public Step convertandPushRecord() {
        return stepBuilders.get("convertandPushRecord")
            .<DataInputFileLine, ProducerRecord<String, String>>chunk(10)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .taskExecutor(taskExecutor())
            .build();
    }
    
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(10);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("MultiThreaded-");
        return executor;
    }

convertandPushRecord() 步骤由“MultipleThreaded-*”线程执行,其中“fileInitializationStep()”或“fileMetadataValidationStep()”等步骤由主线程或 JobLauncher 线程执行,如果配置如下:

    protected JobLauncher createJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        ThreadPoolTaskExecutor tpe = new ThreadPoolTaskExecutor();
        tpe.setMaxPoolSize(1);
        tpe.initialize();
        jobLauncher.setTaskExecutor(tpe);// decide max pool size ??
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

有时(例如 10 次中的 1 次)我遇到的问题是步骤未由“MultiThreaded-*”线程执行,而不是由 joblauncher 线程或主线程执行,它会导致下游整个工作流出错。

这是春季批次的问题还是我错过了一些构造?

4

0 回答 0