4

我需要创建一个恢复模式。在我的模式中,我只能在给定的时间窗口内启动工作。如果作业失败,它只会在下一个时间窗口重新启动,完成后我想启动为此窗口提前计划的计划作业。作业之间的唯一区别是时间窗口参数。

我考虑将 JobExecutionDecider 与 JobExplorer 结合使用或覆盖 Joblauncher。但一切似乎都太突兀了。

我没有找到一个符合我需要的例子,任何想法都会受到欢迎。

4

3 回答 3

3

只是为了回顾一下根据不完整代码提供的建议实际做了什么。我创建了一个类似于下面的恢复流程。恢复流程包含了我的实际批次,并且只负责为内部作业提供正确的作业参数。它可以是第一次执行时的初始参数,正常执行时的新参数或最后一次执行失败时的旧参数。

<batch:job id="recoveryWrapper"
       incrementer="wrapperRunIdIncrementer"
       restartable="true">
    <batch:decision id="recoveryFlowDecision" decider="recoveryFlowDecider">
        <batch:next on="FIRST_RUN" to="defineParametersOnFirstRun" />
        <batch:next on="RECOVER" to="recover.batchJob " />
        <batch:next on="CURRENT" to="current.batchJob " />
    </batch:decision>
    <batch:step id="defineParametersOnFirstRun" next="current.batchJob">
        <batch:tasklet ref="defineParametersOnFirstRunTasklet"/>
    </batch:step>
    <batch:step id="recover.batchJob " next="current.batchJob">
        <batch:job ref="batchJob" job-launcher="jobLauncher"
                            job-parameters-extractor="jobParametersExtractor" />
    </batch:step>
    <batch:step id="current.batchJob" >
       <batch:job ref="batchJob" job-launcher="jobLauncher"
                            job-parameters-extractor="jobParametersExtractor" />
    </batch:step>
</batch:job>

该解决方案的核心是使用Spring Batch Restart机制时的RecoveryFlowDeciderJobParametersExtractor 。RecoveryFlowDecider 将查询 JobExplorer 和 JobRepository 以了解我们在上次运行中是否失败。它将把最后一次执行放在包装器的执行上下文中,以便稍后在 JobParametersExtractor 中使用。请注意使用 runIdIncremeter 允许重新执行包装作业。

@Component
 public class RecoveryFlowDecider implements JobExecutionDecider {
        private static final String FIRST_RUN = "FIRST_RUN";
        private static final String CURRENT = "CURRENT";
        private static final String RECOVER = "RECOVER";

        @Autowired
        private JobExplorer jobExplorer;
        @Autowired
        private JobRepository jobRepository;

        @Override
        public FlowExecutionStatus decide(JobExecution jobExecution
                                         ,StepExecution stepExecution) {
            // the wrapper is named as the wrapped job + WRAPPER
            String wrapperJobName = jobExecution.getJobInstance().getJobName();
            String jobName;
             jobName = wrapperJobName.substring(0,wrapperJobName.indexOf(EtlConstants.WRAPPER));
            List<JobInstance> instances = jobExplorer.getJobInstances(jobName, 0, 1);
                JobInstance internalJobInstance = instances.size() > 0 ? instances.get(0) : null;

            if (null == internalJobInstance) {
                return new FlowExecutionStatus(FIRST_RUN);
            }
            JobExecution lastExecution = jobRepository.getLastJobExecution(internalJobInstance.getJobName()
    ,internalJobInstance.getJobParameters());
            //place the last execution on the context (wrapper context to use later)
            jobExecution.getExecutionContext().put(EtlConstants.LAST_EXECUTION, lastExecution);
                ExitStatus exitStatus = lastExecution.getExitStatus();

            if (ExitStatus.FAILED.equals(exitStatus) || ExitStatus.UNKNOWN.equals(exitStatus)) {
                return new FlowExecutionStatus(RECOVER);
            }else if(ExitStatus.COMPLETED.equals(exitStatus)){
                return new FlowExecutionStatus(CURRENT);    
            }
            //We should never get here unless we have a defect
            throw new RuntimeException("Unexpecded batch status: "+exitStatus+" in decider!");

        }

}

然后 JobParametersExtractor 将再次测试上次执行的结果,如果作业失败,它将提供用于执行失败作业的原始参数,从而触发 Spring Bacth 重启机制。否则,它将创建一组新的参数,并将按照他的正常路线执行。

    @Component
    public class JobExecutionWindowParametersExtractor implements
            JobParametersExtractor {
        @Override
        public JobParameters getJobParameters(Job job, StepExecution stepExecution) {

            // Read the last execution from the wrapping job
            // in order to build Next Execution Window
            JobExecution lastExecution= (JobExecution) stepExecution.getJobExecution().getExecutionContext().get(EtlConstants.LAST_EXECUTION);;

            if(null!=lastExecution){
                if (ExitStatus.FAILED.equals(lastExecution.getExitStatus())) {
                    JobInstance instance = lastExecution.getJobInstance();
                    JobParameters parameters = instance.getJobParameters();                 
                                return parameters;
                }
            }       
            //We do not have failed execution or have no execution at all we need to create a new execution window
            return buildJobParamaters(lastExecution,stepExecution);
        }
...
}
于 2014-01-11T19:46:28.230 回答
2

你考虑过JobStep吗?也就是说,一个步骤确定是否有任何其他作业要运行。这个值被设置到 StepExecutionContext 中。JobExecutionDecider 然后检查这个值;如果存在,则指向启动 Job 的 JobStep。

这是上面的文档http://docs.spring.io/spring-batch/reference/htmlsingle/#external-flows

于 2013-01-14T12:32:32.487 回答
0

是否有可能以相反的方式做到这一点?

在每个时间窗口中,提交针对该时间窗口的作业。

但是,作业的第一步应该检查上一个时间窗口中的作业是否成功完成。如果之前失败,则提交上一个作业,并等待完成,然后再进入自己的逻辑。

于 2013-01-14T08:55:55.620 回答