正如亚历克斯所说,这种行为似乎是根据 javadocs 的合同:
子类只需要提供一个获取下一个结果的方法 * 和一个等待所有结果从并发进程或线程返回的方法
看着:
TaskExecutorRepeatTemplate#waitForResults
您的另一个选择是使用 Partitioning :
- 一个 TaskExecutorPartitionHandler 将从 Partitionned ItemReader 执行项目,见下文
- 一个 Partitioner 实现,它提供了 ItemReader 处理的范围,请参阅下面的 ColumnRangePartitioner
- CustomReader 将使用 Partitioner 将填充的数据读取数据,请参阅下面的 myItemReader 配置
Michael Minella 在他的书Pro Spring Batch的第 11 章中解释了这一点:
<batch:job id="batchWithPartition">
<batch:step id="step1.master">
<batch:partition partitioner="myPartitioner" handler="partitionHandler"/>
</batch:step>
</batch:job>
<!-- This one will create Paritions of Number of lines/ Grid Size-->
<bean id="myPartitioner" class="....ColumnRangePartitioner"/>
<!-- This one will handle every partition in a Thread -->
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="multiThreadedTaskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
<batch:step id="step1">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="myItemReader"
writer="manipulatableWriterForTests" commit-interval="1"
skip-limit="30000">
<batch:skippable-exception-classes>
<batch:include class="java.lang.Exception" />
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
</batch:step>
<!-- scope step is critical here-->
<bean id="myItemReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<property name="dataSource" ref="dataSource"/>
<property name="sql">
<value>
<![CDATA[
select * from customers where id >= ? and id <= ?
]]>
</value>
</property>
<property name="preparedStatementSetter">
<bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
<property name="parameters">
<list>
<!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext-->
<value>{stepExecutionContext[minValue]}</value>
<value>#{stepExecutionContext[maxValue]}</value>
</list>
</property>
</bean>
</property>
<property name="rowMapper" ref="customerRowMapper"/>
</bean>
分区器.java:
package ...;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
public class ColumnRangePartitioner implements Partitioner {
private String column;
private String table;
public Map<String, ExecutionContext> partition(int gridSize) {
int min = queryForInt("SELECT MIN(" + column + ") from " + table);
int max = queryForInt("SELECT MAX(" + column + ") from " + table);
int targetSize = (max - min) / gridSize;
System.out.println("Our partition size will be " + targetSize);
System.out.println("We will have " + gridSize + " partitions");
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
System.out.println("minValue = " + start);
System.out.println("maxValue = " + end);
start += targetSize;
end += targetSize;
number++;
}
System.out.println("We are returning " + result.size() + " partitions");
return result;
}
public void setColumn(String column) {
this.column = column;
}
public void setTable(String table) {
this.table = table;
}
}