3

我遵循了春季批处理文档,但无法让我的工作异步运行。

因此,我从 Web 容器运行作业,作业将通过 REST 端点触发。

我想在完成整个工作之前获取 JobInstance ID 以作为响应传递它。因此他们可以稍后使用 JobInstance ID 检查作业的状态,而不是等待。但我无法让它工作。下面是我尝试过的示例代码。请让我知道我错过了什么或错了什么。

BatchConfig 制作异步 JobLauncher

@Configuration
public class BatchConfig {

    @Autowired
    JobRepository jobRepository;


    @Bean
    public JobLauncher simpleJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
}

控制器

@Autowired
JobLauncher jobLauncher;

@RequestMapping(value="/trigger-job", method = RequestMethod.GET)
public Long workHard() throws Exception {
    JobParameters jobParameters = new JobParametersBuilder().
            addLong("time", System.currentTimeMillis())
            .toJobParameters();
    JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
    System.out.println(jobExecution.getJobInstance().getInstanceId());
    System.out.println("OK RESPONSE");
    return jobExecution.getJobInstance().getInstanceId();
}

和 JobBuilder 作为组件

@Component
public class BatchComponent {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    public Job customJob(String someParam) throws Exception {
        return jobBuilderFactory.get("personProcessor")
                .incrementer(new RunIdIncrementer()).listener(listener())
                .flow(personPorcessStep(someParam)).end().build();
    }


    private Step personPorcessStep(String someParam) throws Exception {
        return stepBuilderFactory.get("personProcessStep").<PersonInput, PersonOutput>chunk(1)
                .reader(new PersonReader(someParam)).faultTolerant().
                        skipPolicy(new DataDuplicateSkipper()).processor(new PersonProcessor())
                .writer(new PersonWriter()).build();
    }


    private JobExecutionListener listener() {
        return new PersonJobCompletionListener();
    }

    private class PersonInput {
        String firstName;

        public PersonInput(String firstName) {
            this.firstName = firstName;
        }

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    private class PersonOutput {
        String firstName;

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    public class PersonReader implements ItemReader<PersonInput> {
        private List<PersonInput> items;
        private int count = 0;

        public PersonReader(String someParam) throws InterruptedException {
            Thread.sleep(10000L); //to simulate processing
            //manipulate and provide data in the read method
            //just for testing i have given some dummy example
            items = new ArrayList<PersonInput>();
            PersonInput pi = new PersonInput("john");
            items.add(pi);
        }

        @Override
        public PersonInput read() {
            if (count < items.size()) {
                return items.get(count++);
            }
            return null;
        }
    }


    public class DataDuplicateSkipper implements SkipPolicy {

        @Override
        public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
            if (exception instanceof DataIntegrityViolationException) {
                return true;
            }
            return true;
        }
    }


    private class PersonProcessor implements ItemProcessor<PersonInput, PersonOutput> {

        @Override
        public PersonOutput process(PersonInput item) throws Exception {
            return null;
        }
    }

    private class PersonWriter implements org.springframework.batch.item.ItemWriter<PersonOutput> {
        @Override
        public void write(List<? extends PersonOutput> results) throws Exception {
            return;
        }
    }

    private class PersonJobCompletionListener implements JobExecutionListener {
        public PersonJobCompletionListener() {
        }

        @Override
        public void beforeJob(JobExecution jobExecution) {

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
            System.out.println("JOB COMPLETED");
        }
    }
}

主功能

@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
@EnableAsync
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}

我正在使用基于注释的配置,并将 gradle 与以下批处理包一起使用。

compile('org.springframework.boot:spring-boot-starter-batch')

如果需要更多信息,请告诉我。我找不到任何示例来运行这个常见用例。

谢谢你的时间。

4

7 回答 7

7

试试这个,在您的配置中,您需要使用@Bean(name = "myJobLauncher")创建带有SimpleAsyncTaskExecutor的 customJobLauncher ,同样将在您的控制器中使用@Qualifier 。

@Bean(name = "myJobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

在你的控制器中

@Autowired
@Qualifier("myJobLauncher")
private JobLauncher jobLauncher;
于 2019-03-23T11:04:05.763 回答
3

如果我查看您的代码,我会看到一些错误。首先,您的自定义配置未加载,因为如果已加载,对于同一接口的重复 bean 实例,注入将失败。

spring boot 有很多神奇之处,但如果你不告诉他做一些组件扫描,什么都不会像预期的那样加载。

我可以看到的第二个问题是您的 BatchConfig 类:它没有扩展 DefaultBatchConfigure,也没有覆盖 getJobLauncher(),因此即使启动魔法将加载所有内容,您也会获得默认值。这是一个可行的配置,它符合@EnableBatchProcessing API文档

批处理配置

@Configuration
@EnableBatchProcessing(modular = true)
@Slf4j
public class BatchConfig extends DefaultBatchConfigurer {

  @Override
  @Bean
  public JobLauncher getJobLauncher() {
    try {
      SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
      jobLauncher.setJobRepository(getJobRepository());
      jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
      jobLauncher.afterPropertiesSet();
      return jobLauncher;

    } catch (Exception e) {
      log.error("Can't load SimpleJobLauncher with SimpleAsyncTaskExecutor: {} fallback on default", e);
      return super.getJobLauncher();
    }
  }
}

主功能

@SpringBootApplication
@EnableScheduling
@EnableAsync
@ComponentScan(basePackageClasses = {BatchConfig.class})
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}
于 2019-09-19T16:34:20.753 回答
2

我知道这是一个老问题,但无论如何我都会为未来的用户发布这个答案。

查看您的代码后,我不知道您为什么会遇到此问题,但我可以建议您使用 Qualifier 注释并像这样使用 ThreadPoolTask​​Executor 并查看它是否解决了您的问题。

您还可以查看本教程:Asynchronous Spring Batch Job Processing了解更多详细信息。它将帮助您异步配置 spring 批处理作业。这个教程是我写的。

@Configuration
public class BatchConfig {

 @Autowired
 private JobRepository jobRepository;

 @Bean
 public TaskExecutor threadPoolTaskExecutor(){

  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(12);
        executor.setCorePoolSize(8);
        executor.setQueueCapacity(15);

   return executor;
 }

 @Bean
    public JobLauncher asyncJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();

        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(threadPoolTaskExecutor());
        return jobLauncher;
 }
}
于 2019-09-26T19:03:59.453 回答
1

JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);. Joblauncher 将在 Job 完成后等待返回任何内容,这就是为什么如果这是您的问题,您的服务可能需要很长时间才能响应。如果您想要异步功能,您可能需要查看 Spring 的@EnableAsync& @Async

@EnableAsync

于 2018-12-09T09:34:57.473 回答
1

尽管您有自定义,但您正在使用Spring 提供的jobLauncher默认值运行作业。jobLauncher你能simpleJobLauncher在你的控制器中自动接线并试一试吗?

于 2019-03-05T10:36:11.537 回答
1

根据 Spring 文档,要异步返回 http 请求的响应,需要使用 org.springframework.core.task.SimpleAsyncTaskExecutor。

spring TaskExecutor 接口的任何实现都可以用来控制作业的异步执行方式。

春季批处理文档

<bean id="jobLauncher"
  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
</property>

于 2019-08-29T08:21:52.713 回答
0

如果您使用的是 Lombok,这可能会对您有所帮助:

TLDR: Lombok@AllArgsConstructor似乎不适用于@Qualifier注释 编辑:如果您@Qualifier在文件中启用注释lombok.config以便能够像这样@Qualifier使用@AllArgsConstructor

lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier

我知道老问题,但是我遇到了完全相同的问题,但没有一个答案能解决它。

我像这样配置了异步作业启动器并添加了限定符以确保注入此 jobLauncher:

 @Bean(name = "asyncJobLauncher")
 public JobLauncher simpleJobLauncher(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

然后像这样注入

@Qualifier("asyncJobLauncher")
private final JobLauncher jobLauncher;

在将 Lombok 更改为自动装配后,我正在使用 Lombok @AllArgsConstructor,正确的作业启动器被注入,作业现在异步执行:

@Autowired
@Qualifier("asyncJobLauncher")
private JobLauncher jobLauncher;

我也不必扩展我的配置DefaultBatchConfigurer

于 2021-02-11T16:47:38.523 回答