如何保证一个Job在运行时,不允许同时再次运行?
我们有 BJ 需要 1 小时来处理提要并填充临时表。此 BJ 的第一步是清除临时表并开始填充主商店前台表中的数据。
考虑一个场景,当 BJ 启动(第一次运行)时,如果我们再次启动 BJ,它将作为第一步的一部分从临时表中删除内容。
所以请建议我如何保持第二次执行直到第一次没有完成?
如何保证一个Job在运行时,不允许同时再次运行?
我们有 BJ 需要 1 小时来处理提要并填充临时表。此 BJ 的第一步是清除临时表并开始填充主商店前台表中的数据。
考虑一个场景,当 BJ 启动(第一次运行)时,如果我们再次启动 BJ,它将作为第一步的一部分从临时表中删除内容。
所以请建议我如何保持第二次执行直到第一次没有完成?
您可以创建自定义Tasklet作为第一步,并在其中使用JobExecutionDao来查找所有JobExecutions。如果有多个运行- 抛出异常。
我敢肯定,这不是最好的解决方案,但我希望这对您的情况有所帮助。
执行作业时,请确保始终使用相同的参数运行作业。在您的作业成功执行完成后,配置您的调用脚本以删除与批处理作业执行相对应的条目。
这样,它会给出错误,并且不允许您同时运行同一作业的 2 次执行。删除将确保允许串行执行。
替代方法:
使用单个参数编写您的作业job-execution-id
。每次执行作业之前,请从作业的批处理表中查询已完成作业的最大值。现在,以 1 作为输入参数job-execution-id
执行作业。job-execution-id
我认为这是比上面更好的方法。我不确定 springbatch 本身是否提供了任何简单的选项来实现这个场景。
也许我误解了您的问题,但是您可以通过throttle-limit
在 Step 中的 Tasklet 上指定 a 来限制任何单个步骤的并行执行次数。指定一个应该确保您一次只有一个执行:
<batch:step id="stepA" next="stepB">
<batch:tasklet throttle-limit="1">
<batch:chunk reader="myReader" writer="myWriter" commit-interval="100"/>
</batch:tasklet>
</batch:step>
在单个 JVM 中实现这一点应该可以使用二进制信号量来实现。这将有助于避免并行执行同一作业。如果您不希望它在设置信号量值的情况下简单地跳过执行,那么让第二个实例等待会有点棘手。
您可以使用合适的“Leader Election”实现进行更复杂的序列化(包括跨 Spring 批处理节点)。我在我的项目中使用了 Netflix Curator(一个 Apache Zookeeper 配方)。这里有一些指示:https ://github.com/regunathb/Trooper/wiki/Useful-Batch-Libraries
您可以设置 spring-batch-Admin UI 以查看作业的状态(失败/运行/完成等)。通过正确设置 Spring Batch Admin UI,您甚至可以查看不同作业中多个任务的状态。
您可以添加 JobExecutionListener 的自定义实现。
下面是示例监听器实现:
@Component
public class JobExecutionListener implements JobExecutionListener{
@Autowired
private JobExplorer jobExplorer;
@Override
public void beforeJob(JobExecution jobExecution) {
int runningJobsCount = jobExplorer.findRunningJobExecutions(jobExecution.getJobInstance().getJobName()).size();
if(runningJobsCount > 1){
throw new RuntimeException("There are already active running instances of this job, Please cancel those executions first.");
}
}
@Override
public void afterJob(JobExecution jobExecution) {
}
}
如果您有该作业的任何实例已经在运行,这会将当前的作业启动标记为失败。您可以根据您的业务需求处理此异常。
我通过编写特殊的增量器来做到这一点,该增量器仅在之前的作业执行完成时才增加属性。
public class CompletedJobRunIdIncrementer extends RunIdIncrementer {
private final JobRepository jobRepository;
private final String jobName;
public CompletedJobRunIdIncrementer(JobRepository jobRepository, String jobName) {
this.jobRepository = jobRepository;
this.jobName = jobName;
}
@Override
public JobParameters getNext(JobParameters parameters) {
JobExecution lastJobExecution = jobRepository.getLastJobExecution(jobName, parameters);
return lastJobExecution == null || lastJobExecution.getStatus() == BatchStatus.COMPLETED ? super.getNext(parameters) : parameters;
}
}
和这个增量器的工作:
jobBuilders.get("myJob").incrementer(new CompletedJobRunIdIncrementer(jobRepository, "myJob").start(someTask()).build()