10

使用 Spring Batch 2.2.1,我配置了一个 Spring Batch Job,我使用了这种方法:

配置如下:

  • Tasklet 使用 ThreadPoolTask​​Executor 限制为 15 个线程

  • 油门限制等于线程数

  • 块用于:

    • JdbcCursorItemReader 的 1 个同步适配器,允许多个线程根据 Spring Batch 文档建议使用它

      您可以同步对 read() 的调用,只要处理和写入是块中最昂贵的部分,您的步骤仍然可以比单线程配置更快地完成。

    • JdbcCursorItemReader 上的 saveState 为 false

    • 基于 JPA 的自定义 ItemWriter。请注意,它对一项的处理可能会因处理时间而异,可能需要几毫秒到几秒(> 60 秒)。

    • commit-interval 设置为 1(我知道它可能会更好,但这不是问题)

  • 所有 jdbc 池都很好,关于 Spring Batch 文档推荐

由于以下原因,运行批处理会导致非常奇怪和糟糕的结果:

  • 在某个步骤,如果写入器需要一些时间来处理这些项目,线程池中的几乎所有线程最终都什么都不做而不是处理,只有慢速写入器在工作。

查看 Spring Batch 代码,根本原因似乎在这个包中:

  • org/springframework/批处理/重复/支持/

这种工作方式是功能还是限制/错误?

如果它是一项功能,那么通过配置使所有线程不会因长时间的处理工作而饿死而不必重写所有内容的方式是什么?

请注意,如果所有项目都花费相同的时间,那么一切正常并且多线程也可以,但是如果其中一个项目处理需要更多时间,那么多线程对于慢速进程的工作时间几乎是无用的。

注意我打开了这个问题:

4

4 回答 4

5

正如亚历克斯所说,这种行为似乎是根据 javadocs 的合同:

子类只需要提供一个获取下一个结果的方法 * 和一个等待所有结果从并发进程或线程返回的方法

看着:

TaskExecutorRepeatTemplate#waitForResults

您的另一个选择是使用 Partitioning :

  • 一个 TaskExecutorPartitionHandler 将从 Partitionned ItemReader 执行项目,见下文
  • 一个 Partitioner 实现,它提供了 ItemReader 处理的范围,请参阅下面的 ColumnRangePartitioner
  • CustomReader 将使用 Partitioner 将填充的数据读取数据,请参阅下面的 myItemReader 配置

Michael Minella 在他的书Pro Spring Batch的第 11 章中解释了这一点

<batch:job id="batchWithPartition">
    <batch:step id="step1.master">
        <batch:partition  partitioner="myPartitioner" handler="partitionHandler"/>
    </batch:step>       
</batch:job>
<!-- This one will create Paritions of Number of lines/ Grid Size--> 
<bean id="myPartitioner" class="....ColumnRangePartitioner"/>
<!-- This one will handle every partition in a Thread -->
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="multiThreadedTaskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>
<batch:step id="step1">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk reader="myItemReader"
                writer="manipulatableWriterForTests" commit-interval="1"
                skip-limit="30000">
                <batch:skippable-exception-classes>
                    <batch:include class="java.lang.Exception" />
                </batch:skippable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
</batch:step>
 <!-- scope step is critical here-->
<bean id="myItemReader"    
                        class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql">
        <value>
            <![CDATA[
                select * from customers where id >= ? and id <=  ?
            ]]>
        </value>
    </property>
    <property name="preparedStatementSetter">
        <bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
            <property name="parameters">
                <list>
 <!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext-->
                    <value>{stepExecutionContext[minValue]}</value>
                    <value>#{stepExecutionContext[maxValue]}</value>
                </list>
            </property>
        </bean>
    </property>
    <property name="rowMapper" ref="customerRowMapper"/>
</bean>

分区器.java:

 package ...;
  import java.util.HashMap;  
 import java.util.Map;
 import org.springframework.batch.core.partition.support.Partitioner;
 import org.springframework.batch.item.ExecutionContext;
 public class ColumnRangePartitioner  implements Partitioner {
 private String column;
 private String table;
 public Map<String, ExecutionContext> partition(int gridSize) {
    int min =  queryForInt("SELECT MIN(" + column + ") from " + table);
    int max = queryForInt("SELECT MAX(" + column + ") from " + table);
    int targetSize = (max - min) / gridSize;
    System.out.println("Our partition size will be " + targetSize);
    System.out.println("We will have " + gridSize + " partitions");
    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
    int number = 0;
    int start = min;
    int end = start + targetSize - 1;
    while (start <= max) {
        ExecutionContext value = new ExecutionContext();
        result.put("partition" + number, value);
        if (end >= max) {
            end = max;
        }
        value.putInt("minValue", start);
        value.putInt("maxValue", end);
        System.out.println("minValue = " + start);
        System.out.println("maxValue = " + end);
        start += targetSize;
        end += targetSize;
        number++;
    }
    System.out.println("We are returning " + result.size() + " partitions");
    return result;
}
public void setColumn(String column) {
    this.column = column;
}
public void setTable(String table) {
    this.table = table;
}
}
于 2013-08-19T17:17:08.077 回答
3

这是我认为正在发生的事情:

  • 如您所说,您的 ThreadPoolTask​​Executor 仅限于 15 个线程
  • 框架的“块”导致 JdbcCursorItemReader 中的每个项目(达到线程限制)在不同的线程中执行
  • 但是,鉴于您的提交间隔为 1,Spring Batch 框架也在等待每个线程(即全部 15 个)完成各自的读/处理/写流程,然后再进入下一个块。有时,这会导致 14线程在一个需要永远完成的兄弟线程上等待近 60 秒。

换句话说,要使 Spring Batch 中的这种多线程方法有用,每个线程需要在大约相同的时间内处理。鉴于某些项目的处理时间之间存在巨大差异的场景,您会遇到许多线程已完成并等待长时间运行的同级线程能够移动到下一个处理块的限制。

我的建议:

  • 一般来说,我会说增加你的提交间隔应该有所帮助,因为它应该允许在提交之间的单个线程中处理多个光标项,即使其中一个线程卡在长时间运行的写入上。然而,如果你不走运,多个长事务可能会发生在同一个线程中并使事情变得更糟(例如,单个线程中的提交间隔为 120 秒,提交间隔为 2)。
  • 具体来说,我建议将您的线程池大小增加到一个很大的数字,甚至超过您的最大数据库连接数 2 倍或 3 倍。应该发生的是,即使您的某些线程会阻止尝试获取连接(因为线程池大小较大),您实际上会看到吞吐量增加,因为您的长时间运行的线程不再阻止其他线程从光标中获取新项目并同时继续批处理作业的工作(在块的开头,您的待处理线程数将大大超过您可用的数据库连接数。因此,操作系统调度程序在激活线程时会稍微搅动一下在获取数据库连接时被阻止并且必须停用线程。但是,
于 2013-08-19T13:50:49.757 回答
1

在我的情况下,如果我没有设置油门限制,那么只有 4 个线程进入 ItemReader 的 read() 方法,这也是默认线程数,如果未根据 Spring Batch 文档在 tasklet 标记中指定。

如果我指定更多线程,例如 10 或 20 或 100,那么只有 8 个线程进入 ItemReader 的 read() 方法

于 2013-09-21T06:07:39.057 回答
1

无论 throttle-limit 的值如何,8 个活动线程的限制可能是由 Spring Batch Job 存储库的争用引起的。每次处理一个块时,都会在作业存储库中写入一些信息。增加其池大小以适应您需要的线程数!

于 2016-01-08T14:44:44.913 回答