1

我需要依次执行七个不同的过程(一个接一个)。数据存储在 Mysql 中。我正在考虑以下选项,如果我错了,或者如果有更好的解决方案,请纠正我。

要求:

  1. 从 Db 中读取数据,最后执行七个过程(datavalidation、calculation1、calculation2 ...等),将处理后的数据写入 DB。

  2. 需要分块处理数据。

我的解决方案和问题: 数据读取:

  1. 使用 JdbcCursorItemReader 读取数据,因为这是性能最好的数据库读取器 - 但是,SQL 非常复杂,所以我可能不得不考虑使用 JdbcTemplate 自定义 ItemReader?这使我在处理数据时更加灵活。

过程:

  1. 定义七个步骤和块,使用databean共享步骤之间的数据。但是,这不是一个好主意,因为数据以块的形式处理,并且在每个块之后,step1 编写器将在 databean 中创建一组新数据。当这个databean 跨其他步骤共享时,数据完整性将成为一个问题。

  2. 使用 StepExecutionContext 在步骤之间共享数据。但这可能会影响性能,因为这涉及批处理作业存储库。

  3. 只定义一个步骤,一个 ItemReader 和一个进程链(七个进程),并创建一个 ItemWriter 将处理后的数据写入 DB。但是,我将无法管理或监控每个不同的进程,所有这些都将在一个步骤中完成。

4

2 回答 2

5

org.springframework.batch.item.support.CompositeItemProcessor是 Spring Batch 框架中的一个开箱即用的组件,它将支持您的要求,类似于您的第二个选项。这将允许您执行以下操作;- 在您的设计/解决方案中保持分离以从数据库中读取(itemreader) - 保持每个单独的处理器“关注点”和配置的分离 - 允许任何单独的处理器通过返回 null 来“关闭”块,而不管之前的进程如何

CompositeItemProcessor迭代一个委托循环,因此它与动作模式“相似” 。它在您描述的场景中非常有用,并且仍然允许您利用 Chunk 的好处(异常、重试、提交策略等)

于 2013-04-14T11:44:07.583 回答
1

建议:

1) 使用 JdbcCursorItemReader 读取数据。

所有开箱即用的组件都是不错的选择,因为它们已经实现了 ItemStream 接口,使您的步骤可重新启动。但是就像您提到的,有时,请求只是为了复杂化,或者像我一样,您已经拥有可以重用的服务或 DAO。

我建议您使用 ItemReaderAdapter。它允许您配置一个委托服务来调用以获取您的数据。

    <bean id="MyReader" class="xxx.adapters.MyItemReaderAdapter">
       <property name="targetObject" ref="AnExistingDao" />
       <property name="targetMethod" value="next" />        
   </bean>

注意 targetMethod 必须遵守 ItemReaders 的读取约定(没有更多数据时返回 null)

如果您的工作不需要重新启动,您可以简单地使用类:org.springframework.batch.item.adapter.ItemReaderAdapter

但是如果你需要你的工作是可重新启动的,你可以像这样创建你自己的 ItemReaderAdapter:

public class MyItemReaderAdapter<T> extends AbstractMethodInvokingDelegator<T> implements ItemReader<T>, ItemStream {




private long currentCount = 0;

private final String CONTEXT_COUNT_KEY = "count"; 

/**
 * @return return value of the target method.
 */
public T read() throws Exception {

    super.setArguments(new Long[]{currentCount++});
    return invokeDelegateMethod();
}
@Override
public void open(ExecutionContext executionContext)
        throws ItemStreamException {
    currentCount = executionContext.getLong(CONTEXT_COUNT_KEY,0);

}


@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
    executionContext.putLong(CONTEXT_COUNT_KEY, currentCount);
    log.info("Update Stream current count : " + currentCount);
}


@Override
public void close() throws ItemStreamException {
    // TODO Auto-generated method stub

}

}

因为开箱即用的 itemReaderAdapter 不可重新启动,您只需创建自己的实现 ItemStream

2)关于 7 步与 1 步。

我会在这一步上使用compositeProcessor 一步。7个步骤选项只会带来问题IMO。

1)7步databean:所以你的作家在databean中提交直到第7步..然后第7步作家尝试提交到真正的数据库和繁荣错误!!!一切都丢失了,批次必须从第 1 步重新开始!!

2)上下文的7个步骤:可能会更好,因为您将状态保存在spring批处理元数据中..但是将大数据存储在springBatch的元数据中不是一个好习惯!

3)是去海事组织的方式。;-)

于 2013-04-15T13:50:17.237 回答