2

TL;DR:应该如何使用 Spring Batch Job 创建 Spring Batch Jobs?事务边界似乎是问题所在。这似乎是一个经典的问题,但这里又出现了:

我有以下用例:我需要轮询 FTP 服务器并将找到的 XML 文件作为 blob 存储在数据库中。XML 有 0...N 个感兴趣的条目,我需要发送到外部 Web 服务并存储响应。响应可以是不可重试或可重试的,我需要存储每个请求及其响应以用于审计目的。

域/JPA 模型如下: Batch(包含 XML blob)包含 0-N 个 BatchRow 对象。BatchRow 包含要发送到 Web 服务的数据,它还包含 1...N 个 BatchRowHistory 对象,其中包含有关 Web 服务调用的状态信息。

我被要求使用 Spring Batch 来实现这一点(自从这种集成案例以来,Spring 集成可能是其他可能性)。现在我一直在努力使用不同的方法,我发现这项任务要复杂得多,因此也很困难,恕我直言。

我已将任务拆分为以下工作:

工作1

  • Step11:获取文件并以blob形式存储到数据库中。

  • Step12:将 XML 拆分为条目并将这些条目存储到 db。

  • Step13 : 创建 Job2 并为 Step12 中存储的每个条目启动它。Mark Job2 在域模型数据库中为条目创建了标志。

工作2

  • Step21:为每个条目调用Web服务并将结果存储到db。重试和跳过逻辑驻留在此处。Job2 类型可能需要手动重启等。

这种结构背后的逻辑是 Job1 定期运行(大约每分钟一次)。只要有这些作业并且它们成功或它们的重试限制达到并且它们失败,就会运行 Job2。领域模型存储基本上只存储结果,Spring Batch 负责运行显示。手动重新启动等可以通过 Spring Batch Admin 处理(至少我希望如此)。Job2 在 JobParameters 映射中有 BatchRow 的 id,因此可以在 Spring Batch Admin 中查看。

问题 1这种工作结构有意义吗?即为数据库中的每一行创建新的Spring Batch Jobs,这似乎违背了目的并在某种程度上重新发明了轮子?

问题 2如何在 Step13 中创建那些 Job2 条目?

我在事务和 JobRepository 方面遇到了第一个问题,但通过以下设置成功启动了几个作业:

<batch:step id="Step13" parent="stepParent">
 <batch:tasklet>
   <batch:transaction-attributes propagation="NEVER"/>
   <batch:chunk reader="rowsWithoutJobReader" processor="batchJobCreator" writer="itemWriter"
                commit-interval="10" />
 </batch:tasklet>
</batch:step>

<bean id="stepParent" class="org.springframework.batch.core.step.item.FaultTolerantStepFactoryBean" abstract="true"/>

请注意,commit-interval="10" 意味着这当前最多可以创建 10 个作业,仅此而已......因为 batchJobCreator 调用 JobLauncher.run 方法并且它会顺畅运行但 itemWriter 无法使用更新的信息将 BatchRows 写回数据库( boolean jobCreated 标志已打开)。明显的原因是交易属性中的传播.NEVER,但没有它我无法使用 jobLauncher 创建工作。

因为更新没有传递到数据库,我再次得到相同的 BatchRows 并且它们使日志混乱:

org.springframework.batch.retry.RetryException: Non-skippable exception in recoverer while processing; nested exception is org.springframework.batch.core.repository.JobExecutionAlreadyRunningException: A job execution for this job is already running: JobInstance: id=1, version=0, JobParameters=[{batchRowId=71}], Job=[foo.bar]
        at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$2.recover(FaultTolerantChunkProcessor.java:278)
        at org.springframework.batch.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:420)
        at org.springframework.batch.retry.support.RetryTemplate.doExecute(RetryTemplate.java:289)
        at org.springframework.batch.retry.support.RetryTemplate.execute(RetryTemplate.java:187)
        at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:215)
        at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.transform(FaultTolerantChunkProcessor.java:287)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:190)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:74)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:386)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
        at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:264)
        at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76)
        at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367)
        at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214)
        at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143)
        at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:250)
        at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:195)
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135)
        at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61)
        at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60)
        at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
        at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
        at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
        at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:293)
        at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120)
        at java.lang.Thread.run(Thread.java:680)

这意味着该作业已在 Spring Batch 中创建,它会在稍后执行 Step13 时再次尝试创建这些文件。我可以规避在 Job2/Step21 中将 jobCreated 标志设置为 true,但这对我来说有点笨拙和错误。

问题 3:我有更多的领域对象驱动方法;我让 Spring Batch Jobs 使用非常精细的 JPQL 查询和 JPAItemReaders 扫描域表。这种方法的问题在于它没有使用 Spring Batch 的更精细的特性。历史和重试逻辑是问题所在。我需要将重试逻辑直接编码到 JPQL 查询中(例如,如果 BatchRow 有超过 3 个 BatchRowHistory 元素,它就失败了,需要手动重新检查)。我是否应该硬着头皮继续使用这种方法,而不是尝试为每个 Web 服务调用创建单独的 Spring Batch Job?

软件信息(如果需要):Spring Batch 2.1.9、Hibernate 4.1.2、Spring 3.1.2、Java 6。

提前谢谢你,对长篇大论感到抱歉,蒂莫

编辑1: 我认为我需要产生新工作的原因是:

  • 阅读器返回 null 或抛出异常时循环

  • 交易开始

  • 整个 N 行的读取器 - 处理器 - 写入器循环

  • 批量大小 N 的事务结束

每个失败的条目都是问题;我想要为批处理中的每一行手动重新启动执行(作业是 Spring Batch Admin 中唯一可重新启动的,对吗?),以便我可以使用 Spring Batch Admin 查看失败的作业(其作业参数包含行 ID从域数据库)并重新启动这些等。我如何在不产生作业并将历史存储到域数据库的情况下完成这种行为?

4

1 回答 1

0

好吧,我讨厌回答问题……但我需要知道些什么?

1)如果您的输入文件是 XML,为什么不在它们上使用 StaxEventItemReader 并简单地在步骤 1 中保留您的条目?

2)从一个步骤开始第二份工作!!!!我什至不知道它是否应该工作......但是IMO ..它闻起来;-)

为什么不定义另一个步骤,使用 JdbcCursorItemReader 读取条目并调用 ItemProcessor 中的 Web 服务,然后将结果写入数据库?

也许我不理解您为每次调用 Web 服务创建不同工作的要求!!!

我做了类似于你的用例的事情,它是使用这个场景完成的:

作业 1:步骤 1:读取 xml,处理 pojo->domain obj,将 domain obj 写入 DB

作业 2:步骤 1:从 db 读取 obj,进程 = 调用 WS,在 DB 中写入响应

这很简单并且效果很好(包括可重新启动和跳过功能)

希望它会有所帮助

问候

于 2013-05-02T14:54:14.193 回答