我知道您可以通过配置JobLancher
with来异步启动工作TaskExecutor
:
@Configuration
public class BatchConfig extends DefaultBatchConfigurer {
@Bean
public TaskExecutor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.initialize();
return executor;
}
@Override
@Bean
public JobLauncher getJobLauncher() {
SimpleJobLauncher jobLauncher = null;
try {
jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.setTaskExecutor(getAsyncExecutor());
jobLauncher.afterPropertiesSet();
} catch (Exception e) {
e.printStackTrace(...)
}
return jobLauncher;
}
}
但是,鉴于我有以下服务作为示例:
@Service
public class ProcessorService implements InitializingBean {
private JobLauncher jobLauncher;
private JobExplorer jobExplorer;
private JobLocator jobLocator;
public ProcessorService(final JobLocator jobLocator,
final JobExplorer jobExplorer,
final JobLauncher jobLauncher) {
this.jobLocator = jobLocator;
this.jobExplorer = jobExplorer;
this.jobLauncher = jobLauncher;
}
public JobResultResponse process(JobRequest request, String trackingUUID) {
Job job = getJob(request);
JobParameters parameters = buildJobParams(request, trackingUUID);
JobExecution execution = kickOfJob(job, request, parameters);
return provideAResponse(request, trackingUUID, execution); //previously return response but now perhaps just update the record in the database when it becomes async
}
private JobExecution kickOfJob(Job job, JobRequest request, JobParameters parameters) {
JobExecution execution;
try {
execution = jobLauncher.run(job, parameters);
if (execution.getStatus() != COMPLETED) {
//log exception
}
} catch (JobExecutionAlreadyRunningException e) {
//****
}
return execution;
}
}
和调用process()
工作的控制器:
@RestController
@RequestMapping("/test")
public class TestController {
private ProcessorService processor;
private RequestDataRepo repo;
public TestController(final ProcessorService processor, final RequestDataRepo repo) {
this.processor = processor;
this.repo = repo;
}
@PostMapping(value = "/kickOfJob", consumes = MediaType.APPLICATION_JSON_VALUE)
public HttpEntity<MyReponse> trigger(@Valid @RequestBody JobRequest request) {
final String trackingId = UUID.randomUUID().toString();
final RequestEntity entity = RequestEntity.builder()
.trackingId(trackingId)
.name(request.getName())
.build();
this.repo.save(entity);
processor.process(request, trackingId); //async call, get execution id
//create the response object for immediate return
return new ResponseEntity<>(response, CREATED);
}
}
如何process()
在后台异步运行,以便能够response
立即返回创建?我已经TaskExecutor
在我的应用程序中创建了一个异步 bean。我只是不确定如何调用process()
异步。我需要整个process()
异步,而不仅仅是作业的运行。