所以我的场景深入到本质如下:本质上,我有一个配置文件,其中包含一组 SQL 查询,其结果集需要导出为 CSV 文件。由于某些查询可能会返回数十亿行,并且由于某些事情可能会中断进程(错误、崩溃等),因此我想使用诸如 spring batch 之类的框架,它为我提供了可重启性和作业监控。我正在使用基于文件的 H2 数据库来持久化 spring 批处理作业。
所以,这是我的问题:
创建作业后,我需要为我的 RowMapper 提供一些初始配置。那么当一个工作在崩溃之后需要重新启动时会发生什么?具体来说:
- RowMapper 的状态是否自动持久化,并且在重新启动时 Spring 批处理将尝试从其数据库中恢复对象,或者
- 将使用作为原始 spring 批处理 XML 配置文件一部分的 RowMapper 对象,或者
- 我必须使用步骤/作业的 ExecutionContext 来维护 RowMapper 的状态?
上面的问题与使用spring批处理XML配置时是否有魔法有关,或者我是否可以以编程方式创建所有这些bean:因为我需要将自己的配置格式解析为spring批处理作业配置,我宁愿只使用 spring batch 的 Java 类(bean)并适当地填写它们,而不是尝试手动写出有效的 XML。但是,如果我的 Job 崩溃了,我会自己重新创建所有 bean。Spring Batch 是否会自动从其数据库中恢复作业状态?
如果我真的需要 XML,有没有办法将 spring-batch JobRepository(或其中一个对象)序列化为 spring-batch XML 配置?
现在,我尝试使用以下代码配置我的步骤 - 但我不确定这是否是正确的方法:
- TaskletStep 是要走的路吗?
- 我创建分块读取器/写入器的方式是否正确,还是应该使用其他对象?
我会假设读取器和写入器的打开会作为 JobExecution 的一部分自动发生,但是如果我在运行 Job 之前没有打开这些资源,我会收到一个异常,告诉我需要先打开它们。也许我需要创建一些其他管理资源的对象(jdbc 连接和文件句柄)?
JdbcCursorItemReader<Foobar> itemReader = new JdbcCursorItemReader<Foobar>(); itemReader.setSql(sqlStr); itemReader.setDataSource(dataSource); itemReader.setRowMapper(rowMapper); itemReader.afterPropertiesSet(); ExecutionContext executionContext = new ExecutionContext(); itemReader.open(executionContext); FlatFileItemWriter<String> itemWriter = new FlatFileItemWriter<String>(); itemWriter.setLineAggregator(new PassThroughLineAggregator<String>()); itemWriter.setResource(outResource); itemWriter.afterPropertiesSet(); itemWriter.open(executionContext); int commitInterval = 50000; CompletionPolicy completionPolicy = new SimpleCompletionPolicy(commitInterval); RepeatTemplate repeatTemplate = new RepeatTemplate(); repeatTemplate.setCompletionPolicy(completionPolicy); RepeatOperations repeatOperations = repeatTemplate; ChunkProvider<Foobar> chunkProvider = new SimpleChunkProvider<Foobar>(itemReader, repeatOperations); ItemProcessor<Foobar, String> itemProcessor = new ItemProcessor<Foobar, String>() { /* Custom implemtation */ }; ChunkProcessor<Foobar> chunkProcessor = new SimpleChunkProcessor<Foobar, String>(itemProcessor, itemWriter); Tasklet tasklet = new ChunkOrientedTasklet<QuadPattern>(chunkProvider, chunkProcessor); //new SplitFilesTasklet(); TaskletStep taskletStep = new TaskletStep(); taskletStep.setName(taskletName); taskletStep.setJobRepository(jobRepository); taskletStep.setTransactionManager(transactionManager); taskletStep.setTasklet(tasklet); taskletStep.afterPropertiesSet(); job.addStep(taskletStep);