16

我有一个直截了当的要求,我需要读取项目列表(从数据库)并需要处理这些项目,一旦处理,它必须更新到数据库中。

我正在考虑将 Spring 批处理块与读取器、处理器和写入器一起使用。我的阅读器将一次从列表中返回一个项目并将其发送到处理器,一旦处理结束,它会返回到 Writer 并在其中更新数据库

稍后我可能会在这些方法中以一些同步成本对它进行多线程处理。

在这里,我预见到一些担忧。

  1. 要处理的项目数量可能更多。可能在 10,000 甚至更多。
  2. 处理器中需要进行一些逻辑计算。因此一次处理 1 个项目。即使它是具有 10 个线程的多线程,也不确定性能。
  3. Writer 可以更新数据库中该已处理项目的结果。不确定如何进行批量更新,因为它始终只有 1 个项目已处理并准备就绪。

这种方法对于这种用例是否正确或可以做任何更好的事情?有没有其他方法可以在一次读取器、处理器和写入器的调用中处理一堆项目?如果是这样,我是否需要创建一些机制,从列表中提取 10 个项目并将其提供给处理器?似乎作者更新了每条记录,只有当作者收到一堆已处理的项目时,批量更新才有意义。有什么建议吗?

请在此设计上添加一些亮点以获得更好的性能。

谢谢,

4

1 回答 1

15

Spring Batch 是满足您需要的完美工具。

面向块的步骤允许您使用 commit-interval 属性配置要读取/处理/写入的项目数量。

        <batch:step id="step1" next="step2">
        <batch:tasklet transaction-manager="transactionManager" start-limit="100">
            <batch:chunk reader="myReader" processor="myProcessor" writer="MyWriter" commit-interval="800" />
            <batch:listeners>
                <batch:listener ref="myListener" />
            </batch:listeners>
        </batch:tasklet>
    </batch:step>

假设您的读者将调用返回 10 000 条记录的 SELECT 语句。然后你设置了一个 commit-interval=500。

MyReader 将调用 read() 方法 500 次。假设实际上,阅读器实现实际上可能会从结果集中删除项目。对于每次调用 read(),它还会调用 MyProcessor 的 process() 方法。

但在达到提交间隔之前,它不会调用 MyWriter 的 write() 方法。

如果看一下接口ItemWriter的定义:

public interface ItemWriter<T> {

/**
 * Process the supplied data element. Will not be called with any null items
 * in normal operation.
 * 
 * @throws Exception if there are errors. The framework will catch the
 * exception and convert or rethrow it as appropriate.
 */
void write(List<? extends T> items) throws Exception;

}

您会看到 write 收到一个项目列表。该列表将是您的提交间隔的大小(如果达到结尾,则更少)

顺便说一句,10 000 条记录算不了什么。如果您必须处理数百万条记录,您可以考虑使用多线程。但即便如此,仅仅使用提交间隔值的最佳位置可能就足够了。

希望能帮助到你

于 2013-06-17T13:28:23.643 回答