1

我使用 Spring 批处理来构建 ETL 作业。我的主要工作只是从数据库中读取并写入另一个数据库。在我的主要工作之前,我需要检查源数据库中的状态以查看它是否准备好。只有当源数据库准备好时,我才会继续进行主要工作。我将检查状态逻辑实现为一个 Tasklet,并使用简单的逻辑构建我的作业,如果检查状态步骤失败重复此步骤,则步骤成功,然后继续主作业。我按如下方式构建工作:

@Bean
public Job myJob(MyListener listener) {

    return jobBuilderFactory.get(jobName)
                            .incrementer(new RunIdIncrementer())
                            .listener(listener)
                            .start(checkStatusStep())
                            .on("*").to(mainStep())
                            .on(ExitStatus.FAILED.getExitCode())
                            .to(checkStatusStep())
                            .end()
                            .build();
}

检查状态步骤是一个 tasklet,如下所示:

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws InterruptedException {
    //isReady method check the db to see if it is ready
    if (isReady()) {
        //contribution.setExitStatus(ExitStatus.COMPLETED);
        return RepeatStatus.FINISHED;
    } else {
        //contribution.setExitStatus(ExitStatus.FAILED);
        return RepeatStatus.CONTINUABLE;
    }
}

我阅读了spring批处理文档,发现条件跟随中的ON方法将检查步骤的退出状态,以便我在StepContribution中设置退出状态。让我感到困惑的是,当我将这两行注释掉时,代码仍然有效。

所以我的问题是,首先,如果条件流检查退出状态,为什么我的代码在没有明确更改退出状态的情况下工作?其次,为什么tasklet会返回一个重复状态,以及这个重复状态是由谁消费的。第三,有没有更好的方法来实现我的目标?

4

1 回答 1

2

如果条件流检查退出状态,为什么我的代码在没有明确更改退出状态的情况下工作?

因为默认情况下,如果 tasklet 返回RepeatStatus.FINISHED,它的退出代码将设置为COMPLETED

其次,为什么tasklet会返回一个重复状态,以及这个重复状态是由谁消费的。

一个 tasklet 被反复调用,TaskletStep直到它返回RepeatStatus.FINISHED或抛出一个异常来表示失败。对 a 的每次调用Tasklet都包含在一个事务中。因此,该结构是在循环内部具有事务边界的循环结构。

第三,有没有更好的方法来实现我的目标?

IMO,你的Tasklet实现是好的,你不需要退出状态。就像是:

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws InterruptedException {
   //isReady method check the db to see if it is ready
   if (isReady()) {
    return RepeatStatus.FINISHED;
   } else {
    return RepeatStatus.CONTINUABLE;
   }
}

这将持续运行,直到isReady为真,因此您无需在作业级别使用流程实现迭代。工作定义是:

@Bean
public Job myJob(MyListener listener) {

   return jobBuilderFactory.get(jobName)
                        .incrementer(new RunIdIncrementer())
                        .listener(listener)
                        .start(checkStatusStep())
                        .next(mainStep())
                        .build();
}

希望这可以帮助。

于 2019-01-28T09:13:56.600 回答