深入研究 Spring Batch,我想知道我们如何在 Job 的不同步骤之间共享数据?
我们可以为此使用 JobRepository 吗?如果是,我们该怎么做?
有没有其他方法可以做/实现同样的事情?
深入研究 Spring Batch,我想知道我们如何在 Job 的不同步骤之间共享数据?
我们可以为此使用 JobRepository 吗?如果是,我们该怎么做?
有没有其他方法可以做/实现同样的事情?
作业存储库间接用于在步骤之间传递数据(Jean-Philippe 是正确的,最好的方法是将数据StepExecutionContext
放入.ExecutionContextPromotionListener
JobExecutionContext
需要注意的是,还有一个侦听器可以将JobParameter
键提升到 a StepExecutionContext
(更冗长的命名JobParameterExecutionContextCopyListener
);如果你的工作步骤不是完全独立的,你会发现你经常使用这些。
否则,您将使用更复杂的方案在步骤之间传递数据,例如 JMS 队列或(天堂禁止)硬编码文件位置。
至于在上下文中传递的数据大小,我还建议您保持较小(但我没有关于
从一个步骤,您可以将数据放入StepExecutionContext
. 然后,使用监听器,您可以将数据从StepExecutionContext
提升到JobExecutionContext
。
这JobExecutionContext
在以下所有步骤中都可用。
注意:数据必须很短。这些上下文保存在JobRepository
by 序列化中,并且长度是有限的(如果我记得清楚的话,是 2500 个字符)。
因此,这些上下文很适合共享字符串或简单值,但不适用于共享集合或大量数据。
共享海量数据并不是 Spring Batch 的理念。Spring Batch 是一组不同的操作,而不是一个巨大的业务处理单元。
我会说你有3个选择:
StepContext
和推广它,JobContext
并且您可以从每个步骤访问它,您必须按照说明遵守大小限制@JobScope
bean 并将数据添加到该 bean,@Autowire
在需要的地方使用它(缺点是它是内存结构,如果作业失败数据丢失,可能会导致可重启性问题)ids
在JobContext
需要时访问并在作业成功完成时删除该临时表。这是我为保存可通过这些步骤访问的对象所做的操作。
@Component("myJobListener")
public class MyJobListener implements JobExecutionListener {
public void beforeJob(JobExecution jobExecution) {
String myValue = someService.getValue();
jobExecution.getExecutionContext().putString("MY_VALUE", myValue);
}
}
<listeners>
<listener ref="myJobListener"/>
</listeners>
@BeforeStep
public void initializeValues(StepExecution stepExecution) {
String value = stepExecution.getJobExecution().getExecutionContext().getString("MY_VALUE");
}
您可以使用 Java Bean 对象
通过这种方式,您可以根据需要存储大量数据
您可以将数据存储在简单对象中。喜欢:
AnyObject yourObject = new AnyObject();
public Job build(Step step1, Step step2) {
return jobBuilderFactory.get("jobName")
.incrementer(new RunIdIncrementer())
.start(step1)
.next(step2)
.build();
}
public Step step1() {
return stepBuilderFactory.get("step1Name")
.<Some, Any> chunk(someInteger1)
.reader(itemReader1())
.processor(itemProcessor1())
.writer(itemWriter1(yourObject))
.build();
}
public Step step2() {
return stepBuilderFactory.get("step2Name")
.<Some, Any> chunk(someInteger2)
.reader(itemReader2())
.processor(itemProcessor2(yourObject))
.writer(itemWriter2())
.build();
}
只需在编写器或任何其他方法中将数据添加到对象并在下一步的任何阶段获取它
使用ExecutionContextPromotionListener
:
public class YourItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(List<? extends Object> items) throws Exception {
// Some Business Logic
// put your data into stepexecution context
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(Final StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
现在您需要将 PromotionListener 添加到您的工作中
@Bean
public Step step1() {
return stepBuilder
.get("step1")<Company,Company> chunk(10)
.reader(reader()).processor(processor()).writer(writer())
.listener(promotionListener()).build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
listener.setStrict(true);
return listener;
}
现在,在第 2 步中,从作业 ExecutionContext 获取数据
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(List<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}
如果您正在使用 tasklet,请使用以下命令获取或放置 ExecutionContext
List<YourObject> yourObjects = (List<YourObject>) chunkContent.getStepContext().getJobExecutionContext().get("someKey");
另一种非常简单的方法,留在这里以备将来参考:
class MyTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
getExecutionContext.put("foo", "bar");
}
}
和
class MyOtherTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
getExecutionContext.get("foo");
}
}
getExecutionContext
这是:
ExecutionContext getExecutionContext(ChunkContext chunkContext) {
return chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext();
}
把它放在一个超类中,作为一个default
方法放在一个接口中,或者简单地粘贴到你Tasklet
的 s.
Spring Batch 为自己创建元数据表(如batch_job_execution
、batch_job_execution_context
、batch_step_instance
等)。
而且我已经测试(使用 postgres DB)在一列()中可以有至少 51,428 个字符的数据batch_job_execution_context.serialized_content
。它可能更多,这就是我测试的程度。
当您将 Tasklets 用于您的步骤(如class MyTasklet implements Tasklet
)并覆盖其中的RepeatStatus
方法时,您可以立即访问ChunkContext
.
class MyTasklet implements Tasklet {
@Override
public RepeatStatus execute(@NonNull StepContribution contribution,
@NonNull ChunkContext chunkContext) {
List<MyObject> myObjects = getObjectsFromSomewhereAndUseThemInNextStep();
chunkContext.getStepContext().getStepExecution()
.getJobExecution()
.getExecutionContext()
.put("mydatakey", myObjects);
}
}
现在您可以使用不同的 Tasklet 进行另一个步骤,您可以在其中访问这些对象
class MyOtherTasklet implements Tasklet {
@Override
public RepeatStatus execute(@NonNull StepContribution contribution,
@NonNull ChunkContext chunkContext) {
List<MyObject> myObjects = (List<MyObject>)
chunkContext.getStepContext().getStepExecution()
.getJobExecution()
.getExecutionContext()
.get("mydatakey");
}
}
或者,如果您没有 Tasklet 并且有类似 Reader/Writer/Processor,那么
class MyReader implements ItemReader<MyObject> {
@Value("#{jobExecutionContext['mydatakey']}")
List<MyObject> myObjects;
// And now myObjects are available in here
@Override
public MyObject read() throws Exception {
}
}
我被分配了一项任务来逐个调用批处理作业。每个作业都依赖于另一个作业。第一个作业结果需要执行后续作业程序。我正在搜索如何在作业执行后传递数据。我发现这个 ExecutionContextPromotionListener 派上用场了。
1)我为“ExecutionContextPromotionListener”添加了一个bean,如下所示
@Bean
public ExecutionContextPromotionListener promotionListener()
{
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys( new String[] { "entityRef" } );
return listener;
}
2)然后我将其中一个听众附加到我的 Steps
Step step = builder.faultTolerant()
.skipPolicy( policy )
.listener( writer )
.listener( promotionListener() )
.listener( skiplistener )
.stream( skiplistener )
.build();
3) 我在我的 Writer 步骤实现中添加了 stepExecution 作为参考,并在 Beforestep 中填充
@BeforeStep
public void saveStepExecution( StepExecution stepExecution )
{
this.stepExecution = stepExecution;
}
4) 在编写器步骤结束时,我将 stepexecution 中的值填充为如下所示的键
lStepContext.put( "entityRef", lMap );
5) 作业执行后,我从 中检索值
lExecution.getExecutionContext()
并填充为作业响应。
6) 从作业响应对象中,我将获取值并在其余作业中填充所需的值。
上述代码用于使用 ExecutionContextPromotionListener 将步骤中的数据提升到 ExecutionContext。它可以在任何步骤中完成。
正如 Nenad Bozic 在他的第三个选项中所说,使用临时表在步骤之间共享数据,使用上下文共享也做同样的事情,它写入表并在下一步加载,但如果你写入临时表,你可以清理工作的结束。
使用Tasklets
. 无需访问执行上下文。我使用地图作为数据元素来移动。(科特林代码。)
class MyTasklet : Tasklet {
lateinit var myMap: MutableMap<String, String>
override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus? {
myMap.put("key", "some value")
return RepeatStatus.FINISHED
}
}
@Configuration
@EnableBatchProcessing
class BatchConfiguration {
@Autowired
lateinit var jobBuilderFactory: JobBuilderFactory
@Autowired
lateinit var stepBuilderFactory: StepBuilderFactory
var myMap: MutableMap<String, String> = mutableMapOf()
@Bean
fun jobSincAdUsuario(): Job {
return jobBuilderFactory
.get("my-SO-job")
.incrementer(RunIdIncrementer())
.start(stepMyStep())
.next(stepMyOtherStep())
.build()
}
@Bean
fun stepMyStep() = stepBuilderFactory.get("MyTaskletStep")
.tasklet(myTaskletAsBean())
.build()
@Bean
fun myTaskletAsBean(): MyTasklet {
val tasklet = MyTasklet()
tasklet.myMap = myMap // collection gets visible in the tasklet
return tasklet
}
}
然后,MyOtherStep
您可以复制中看到的相同成语MyStep
。这个另一个 Tasklet 将看到在MyStep
.
重要:
@Bean fun
以便它们可以使用@Autowired
(完整解释)。InitializingBean
使用覆盖乐趣 afterPropertiesSet() { Assert.notNull(myMap, "myMap 必须在调用 tasklet 之前设置") }