19

我正在使用spring批处理来解析文件,并且我有以下场景:

我正在做一份工作。这项工作必须解析一个给定的文件。由于意外原因(比如说停电),服务器出现故障,我必须重新启动机器。现在,重新启动服务器后,我想从停电前停止的点恢复工作。这意味着如果系统从 10.000 读取 1.300 行,现在必须从 1.301 行开始读取。

如何使用 Spring Batch 实现这种情况?

关于配置:我使用 spring-integration 在目录下轮询新文件。当文件到达时,spring-integration 会创建 spring 批处理作业。此外,spring-batch 使用 FlatFileItemReader 解析文件。

4

4 回答 4

4

这是 JVM 崩溃后重新启动作业的完整解决方案。

  1. 通过设置 restarable="true" 使作业可重新启动

工作 id="jobName" xmlns="http://www.springframework.org/schema/batch" restartable="true"

2. 重新启动作业的代码

import java.util.Date;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;

public class ResartJob {

    @Autowired
    private JobExplorer jobExplorer;
    @Autowired
    JobRepository jobRepository;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired 
    JobOperator jobOperator;

    public void restart(){
        try {
            List<JobInstance> jobInstances = jobExplorer.getJobInstances("jobName",0,1);// this will get one latest job from the database
            if(CollectionUtils.isNotEmpty(jobInstances)){
               JobInstance jobInstance =  jobInstances.get(0);
               List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
               if(CollectionUtils.isNotEmpty(jobExecutions)){
                   for(JobExecution execution: jobExecutions){
                       // If the job status is STARTED then update the status to FAILED and restart the job using JobOperator.java
                       if(execution.getStatus().equals(BatchStatus.STARTED)){ 
                           execution.setEndTime(new Date());
                           execution.setStatus(BatchStatus.FAILED);                               
                           execution.setExitStatus(ExitStatus.FAILED);                               
                           jobRepository.update(execution);
                           jobOperator.restart(execution.getId());
                       }
                   }
               }
            }
        } catch (Exception e1) {
            e1.printStackTrace();
        }
    }
}

3.

<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" p:lobHandler-ref="oracleLobHandler"/>

<bean id="oracleLobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler"/>


<bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean" p:dataSource-ref="dataSource" />

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
        <property name="taskExecutor" ref="jobLauncherTaskExecutor" /> 
</bean> <task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />

<bean id="jobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator" p:jobLauncher-ref="jobLauncher" p:jobExplorer-re`enter code here`f="jobExplorer" p:jobRepository-ref="jobRepository" p:jobRegistry-ref="jobRegistry"/>
于 2016-03-25T17:50:10.510 回答
3

Spring batch 4 的更新解决方法。将 JVM 启动时间考虑在内以检测损坏的作业。请注意,这在多台服务器启动作业的集群环境中不起作用。

@Bean
public ApplicationListener<ContextRefreshedEvent> resumeJobsListener(JobOperator jobOperator, JobRepository jobRepository,
        JobExplorer jobExplorer) {
    // restart jobs that failed due to
    return event -> {
        Date jvmStartTime = new Date(ManagementFactory.getRuntimeMXBean().getStartTime());

        // for each job
        for (String jobName : jobExplorer.getJobNames()) {
            // get latest job instance
            for (JobInstance instance : jobExplorer.getJobInstances(jobName, 0, 1)) {
                // for each of the executions
                for (JobExecution execution : jobExplorer.getJobExecutions(instance)) {
                    if (execution.getStatus().equals(BatchStatus.STARTED) && execution.getCreateTime().before(jvmStartTime)) {
                        // this job is broken and must be restarted
                        execution.setEndTime(new Date());
                        execution.setStatus(BatchStatus.FAILED);
                        execution.setExitStatus(ExitStatus.FAILED);

                        for (StepExecution se : execution.getStepExecutions()) {
                            if (se.getStatus().equals(BatchStatus.STARTED)) {
                                se.setEndTime(new Date());
                                se.setStatus(BatchStatus.FAILED);
                                se.setExitStatus(ExitStatus.FAILED);
                                jobRepository.update(se);
                            }
                        }

                        jobRepository.update(execution);
                        try {
                            jobOperator.restart(execution.getId());
                        }
                        catch (JobExecutionException e) {
                            LOG.warn("Couldn't resume job execution {}", execution, e);
                        }
                    }
                }
            }
        }
    };
}
于 2018-06-01T09:28:06.227 回答
0

在您的情况下,我会做的是创建一个步骤来记录文件中最后处理的行。然后创建将读取此文件并从特定行号开始处理的第二个作业。

因此,如果作业由于某种原因停止,您将能够运行将恢复处理的新作业。

于 2013-03-05T19:30:40.770 回答
0

你也可以像下面这样写:

    @RequestMapping(value = "/updateStatusAndRestart/{jobId}/{stepId}", method = GET)
    public ResponseEntity<String> updateBatchStatus(@PathVariable("jobId") Long jobExecutionId ,@PathVariable("stepId")Long stepExecutionId )throws Exception {

       StepExecution stepExecution =  jobExplorer.getStepExecution(jobExecutionId,stepExecutionId);
            stepExecution.setEndTime(new Date(System.currentTimeMillis()));
            stepExecution.setStatus(BatchStatus.FAILED);
            stepExecution.setExitStatus(ExitStatus.FAILED);
        jobRepository.update(stepExecution);

       JobExecution jobExecution =  stepExecution.getJobExecution();
            jobExecution.setEndTime(new Date(System.currentTimeMillis()));
            jobExecution.setStatus(BatchStatus.FAILED);
            jobExecution.setExitStatus(ExitStatus.FAILED);
        jobRepository.update(jobExecution);
        jobOperator.restart(execution.getId());
        
        return new ResponseEntity<String>("<h1> Batch Status Updated !! </h1>", HttpStatus.OK);
    }

在这里,我使用了 restApi 端点来传递 jobExecutionId 和 stepExecutionId 并将 job_execution 和 step_execution 的状态设置为 FAIL。然后使用批处理运算符重新启动。

于 2020-08-20T17:47:44.450 回答