0

我正在尝试使用 Java 配置和 PostGreSQL 创建一个 Spring Batch POC。

我已经使用@EnableBatchProcessing 和@EnableAutoConfiguration 成功创建了本来可以通过内存数据库提供的bean。

给定从同一个 JobExplorer bean 创建的 JobInstance bean,我无法让 bean (JobExplorer) 返回 JobExecution 列表。

我得到的错误是“无法反序列化执行上下文”,这似乎来自试图反序列化 JOB_EXECUTION_CONTEXT 表的“SHORT_CONTEXT”字段的方法。

我已经通过了创建的 JobExplorer bean DefaultExecutionContextSerializer。后来当我仍然收到错误时,传递了一个将“wrapAsLob”设置为 True 的 DefaultLobHandler。

@Bean
public JobRegistry jobRegistry() {
    JobRegistry jr = new MapJobRegistry();
    return jr;
}

@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
    JobRegistryBeanPostProcessor jrbpp = new JobRegistryBeanPostProcessor();
    jrbpp.setJobRegistry(jobRegistry());
    return jrbpp;
}

@Bean
public JobOperator jobOperator() {
    SimpleJobOperator sjo = new SimpleJobOperator();
    sjo.setJobExplorer(jobExplorer());
    sjo.setJobLauncher(jobLauncher());
    sjo.setJobRegistry(jobRegistry());
    sjo.setJobRepository(jobRepository());

    return sjo;
}

@Bean
public JobExplorer jobExplorer() {
    JobExplorerFactoryBean jefb = new JobExplorerFactoryBean();
    jefb.setDataSource(dataSource());
    jefb.setJdbcOperations(jdbcTemplate);
    jefb.setTablePrefix("batch_");  
    jefb.setSerializer(new DefaultExecutionContextSerializer());
    DefaultLobHandler lh = new DefaultLobHandler();
    lh.setWrapAsLob(true);
    jefb.setLobHandler(lh);
    JobExplorer je = null;
    try {
        je = jefb.getObject();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return je;
}

@ConfigurationProperties(prefix = "spring.datasource")
@Bean
@Primary
public DataSource dataSource() {
    return DataSourceBuilder.create().build();
}

@Bean
public JobRepository jobRepository() {
    JobRepositoryFactoryBean jrfb = new JobRepositoryFactoryBean();
    jrfb.setDataSource(dataSource());
    jrfb.setDatabaseType("POSTGRES");
    jrfb.setTransactionManager(new ResourcelessTransactionManager());
    jrfb.setSerializer(new DefaultExecutionContextSerializer());
    jrfb.setTablePrefix("batch_");
    JobRepository jr = null;
    try {

        jr = (JobRepository)jrfb.getObject();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return jr;
}

下面是我的休息控制器中的 get 方法,我正在尝试处理生成失败的作业执行列表

@Autowired
JobLauncher jobLauncher;

@Autowired
JobRegistry jobRegistry;

@Autowired
JobOperator jobOperator;

@Autowired
JobExplorer jobExplorer;



@SuppressWarnings("unchecked")
@GetMapping("batch/failedJobs")
public Map<String, List<JobExecution>> getFailedJobs() {
    try {
        if (jobRegistry == null || jobOperator == null || jobExplorer == null) {
            System.out.println("job registry, operator or explorer is null");
        } else {
            Map<String, List<JobExecution>> allJobInstances = new HashMap<String, List<JobExecution>>();
            // Get all jobs
            jobRegistry.getJobNames().stream().forEach(jobName -> {
                jobExplorer.getJobInstances(jobName, 1, 1000).forEach(l -> {
                    System.out.println("jobName: " + jobName + " instance: " + l);
                });
                jobExplorer.getJobInstances(jobName, 1, 1000).stream().forEach(jobInstance -> {
                    List<JobExecution> execultionList = jobExplorer.getJobExecutions(jobInstance); //Failing here
                    if (execultionList != null) { 
                        System.out.println("" + execultionList);
                        execultionList.stream().forEach(l2 -> {
                            System.out.println("jobName: " + jobName + " instance: " + jobInstance
                                    + " jobExecution: " + l2);
                        });
                        if(allJobInstances.get(jobName) == null) {
                            allJobInstances.put(jobName, new ArrayList<JobExecution>());
                        }
                        allJobInstances.get(jobName).addAll((Collection<? extends JobExecution>) jobExplorer.getJobExecutions(jobInstance).stream().filter(e -> e.getStatus().equals(BatchStatus.FAILED)));
                    }else {
                        System.out.println("Could not get jobExecution for jobName " + jobName + " jobInstance: " + jobInstance);
                    }
                });
            });

            return allJobInstances;
        }

    }catch (Exception e) {
        System.out.println(e.getMessage());
        logger.info(e.getMessage());
    }
    return null;
}
4

1 回答 1

0

我通过更改为 Jackson2 序列化程序解决了类似的问题:

jefb.setSerializer(new Jackson2ExecutionContextStringSerializer());

你可以试试。

于 2018-04-16T08:38:25.860 回答