0

我知道您可以通过配置JobLancherwith来异步启动工作TaskExecutor

@Configuration
public class BatchConfig extends DefaultBatchConfigurer {

    @Bean
    public TaskExecutor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(200);
        executor.initialize();
        return executor;
    }

    @Override
    @Bean
    public JobLauncher getJobLauncher() {
        SimpleJobLauncher jobLauncher = null;
        try {
            jobLauncher = new SimpleJobLauncher();
            jobLauncher.setJobRepository(getJobRepository());
            jobLauncher.setTaskExecutor(getAsyncExecutor());
            jobLauncher.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace(...)
        }
        return jobLauncher;
    }
}

但是,鉴于我有以下服务作为示例:

@Service
public class ProcessorService implements InitializingBean {

    private JobLauncher jobLauncher;
    private JobExplorer jobExplorer;
    private JobLocator jobLocator;

    public ProcessorService(final JobLocator jobLocator,
                         final JobExplorer jobExplorer,
                         final JobLauncher jobLauncher) {
        this.jobLocator = jobLocator;
        this.jobExplorer = jobExplorer;
        this.jobLauncher = jobLauncher;
    }
    
    public JobResultResponse process(JobRequest request, String trackingUUID)  {
        Job job = getJob(request);
        JobParameters parameters = buildJobParams(request, trackingUUID);
        JobExecution execution = kickOfJob(job, request, parameters);
        return provideAResponse(request, trackingUUID, execution); //previously return response but now perhaps just update the record in the database when it becomes async
    }

    private JobExecution kickOfJob(Job job, JobRequest request, JobParameters parameters) {
        JobExecution execution;
        try {
            execution = jobLauncher.run(job, parameters);         
            if (execution.getStatus() != COMPLETED) {
                //log exception
            }
        } catch (JobExecutionAlreadyRunningException e) {
            //****
        }
        return execution;
    }
    
}

和调用process()工作的控制器:

@RestController
@RequestMapping("/test")
public class TestController {

    private ProcessorService processor;
    private RequestDataRepo repo;

    public TestController(final ProcessorService  processor, final RequestDataRepo repo) {
        this.processor = processor;
        this.repo = repo;
    }

    @PostMapping(value = "/kickOfJob", consumes = MediaType.APPLICATION_JSON_VALUE)
    public HttpEntity<MyReponse> trigger(@Valid @RequestBody JobRequest request) {

        final String trackingId = UUID.randomUUID().toString();

        final RequestEntity entity = RequestEntity.builder()
                .trackingId(trackingId)
                .name(request.getName())
                .build();

        this.repo.save(entity);

        processor.process(request, trackingId); //async call, get execution id
        
        //create the response object for immediate return


        return new ResponseEntity<>(response, CREATED);
    }

}

如何process()在后台异步运行,以便能够response立即返回创建?我已经TaskExecutor在我的应用程序中创建了一个异步 bean。我只是不确定如何调用process()异步。我需要整个process()异步,而不仅仅是作业的运行。

4

0 回答 0