0

我正在尝试创建一个简单的备份作业处理,以验证我的目标是否正确。

我正在尝试使用弹簧批量测试来验证结果。

PS 我的批处理使用框架提供的非默认配置,因为我们的作业存储库应该使用非默认模式名称。

我的作业的读取步骤配置为带有@StepScope注释的延迟初始化,这是必需的,因为我的作业应该有一些参数来在读取步骤中查询数据库

这是我们正在使用的示例配置它位于根包中,其余批处理配置位于子包中

@Configuration
@Import({ApplicationHibernateConfiguration.class})
@ComponentScan
public class ApplicationBatchConfiguration extends DefaultBatchConfigurer {

    private static final String BATCH_PROCESSING_PREFIX = "BATCH_PROCESSING.BATCH_";
    private final DataSource dataSource;
    private final PlatformTransactionManager transactionManager;
    private final JobLauncher jobLauncher;
    private final JobRepository jobRepository;
    private final JobExplorer jobExplorer;

    @Autowired
    public GlobalLogisticsPortalBatchConfiguration(
            DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
        this.dataSource = dataSource;
        this.transactionManager = transactionManager;
        this.jobRepository = createJobRepository();
        this.jobLauncher = createJobLauncher();
        this.jobExplorer = createJobExplorer();
    }


    @Override
    protected JobLauncher createJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Override
    protected JobRepository createJobRepository() throws Exception {
        JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
        factoryBean.setDatabaseType("DB2");
        factoryBean.setTablePrefix(BATCH_PROCESSING_PREFIX);
        factoryBean.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
        factoryBean.setDataSource(this.dataSource);
        factoryBean.setTransactionManager(this.transactionManager);
        factoryBean.afterPropertiesSet();
        return factoryBean.getObject();
    }

    @Override
    protected JobExplorer createJobExplorer() throws Exception {
        JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
        factoryBean.setDataSource(this.dataSource);
        factoryBean.setTablePrefix(BATCH_PROCESSING_PREFIX);
        factoryBean.afterPropertiesSet();
        return factoryBean.getObject();
    }

    @Override
    @Bean
    public JobRepository getJobRepository() {
        return jobRepository;
    }

    @Override
    public PlatformTransactionManager getTransactionManager() {
        return transactionManager;
    }

    @Override
    @Bean
    public JobLauncher getJobLauncher() {
        return jobLauncher;
    }

    @Override
    @Bean
    public JobExplorer getJobExplorer() {
        return jobExplorer;
    }

    @Bean
    public JobBuilderFactory jobBuilderFactory(JobRepository jobRepository) {
        return new JobBuilderFactory(jobRepository);
    }

    @Bean
    public StepBuilderFactory stepBuilderFactory(
            JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilderFactory(jobRepository, transactionManager);
    }


}

我尝试使用的步骤如下所示:

    @Bean
    @StepScope
    public JdbcPagingItemReader<DomainObject> itemReader(
            @Value("#{jobParameters['id']}") String id) {
        JdbcPagingItemReader<DomainObject> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(this.dataSource);
        reader.setFetchSize(10);
        Db2PagingQueryProvider nativeQueryProvider = new Db2PagingQueryProvider();
        nativeQueryProvider.setSelectClause("*");
        nativeQueryProvider.setFromClause("from SCHEMA.DOMAIN");
        nativeQueryProvider.setWhereClause("id = :id");
        Map<String, Object> params = new HashMap<>(1);
        params.put("id", id);
        reader.setRowMapper((rs, rowNum) -> {
            DomainObject element = new DomainObject();
            element.setId(rs.getString("ID"));
            return element;
        });
        reader.setParameterValues(params);
        reader.setQueryProvider(nativeQueryProvider);
        return reader;
    }

    @Bean
    public Step fetchDomain() throws Exception {
        return stepBuilderFactory.get("fetchDomain")
                .<HierarchyElement, HierarchyElement>chunk(10)
                .faultTolerant()
                .reader(itemReader(null))
                .writer(items -> items.forEach(System.out::println))
                .build();
    }

实际的作业 bean 当前配置为仅启动单步

    @Bean
    public Job backupJob() throws Exception {
        return jobBuilderFactory.get("backupJob")
                .start(fetchHeid())
                .build();
    }

我的测试代码如下所示

@RunWith(SpringRunner.class)
@SpringBatchTest
@ContextConfiguration(classes = {ApplicationBatchConfiguration.class})
public class BackupJobConfigurationTest {
    @Autowired
    @Qualifier(value = "backupJob")
    public Job job;

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    public void flowTest() throws Exception {
        JobParameters parameters = new JobParametersBuilder()
                .addString("id", "124")
                .toJobParameters();

        JobExecution execution = jobLauncherTestUtils.launchJob(parameters);

        assertEquals(BatchStatuses.COMPLETED, execution.getExitStatus().getExitCode()); //failed
    }
}

我希望退出代码为“已完成”并获得“未知”。此外,我不确定代码实际上是否被调用,因为我没有看到 writer lambda 的任何输出。

我在测试中看到的唯一输出是

Aug 30, 2019 2:52:17 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=backupJob]] launched with the following parameters: [{id=124}]

org.junit.ComparisonFailure: 
Expected :COMPLETED
Actual   :UNKNOWN
4

1 回答 1

0

我知道,首先,我需要从我的配置中删除 SimpleAsyncTaskExecutor 以在同一个线程中实际测试代码,然后通过更仔细地阅读参考我重新配置了我的批处理配置,而不是直接扩展 BatchConfigurer 我已在我的配置中将其配置为 bean 并添加 @EnableSpringBatch 注释

    @Bean
    public BatchConfigurer batchConfigurer() {
    return new DefaultBatchConfigurer() {
        @Override
        protected JobRepository createJobRepository() throws Exception {
            JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
            factoryBean.setDatabaseType("db2");
            factoryBean.setTablePrefix(BATCH_PROCESSING_PREFIX);
            factoryBean.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
            factoryBean.setDataSource(dataSource);
            factoryBean.setTransactionManager(transactionManager);
            factoryBean.afterPropertiesSet();
            return factoryBean.getObject();
        }

        @Override
        protected JobExplorer createJobExplorer() throws Exception {
            JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
            factoryBean.setDataSource(dataSource);
            factoryBean.setTablePrefix(BATCH_PROCESSING_PREFIX);
            factoryBean.afterPropertiesSet();
            return factoryBean.getObject();
        }

        @Override
        protected JobLauncher createJobLauncher() throws Exception {
            SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
            jobLauncher.setJobRepository(getJobRepository());
            //jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
            jobLauncher.afterPropertiesSet();
            return jobLauncher;
        }
    };
}
于 2019-08-30T15:16:34.783 回答