我有批处理作业,我从数据库中读取了超过 100 万条记录,并且正在使用 Scrollable Resultset 访问这些记录。现在我正在将该工作转换为春季批次。可滚动的结果集在这种情况下不起作用。我已经尝试过,但是在读取第一个块结果集中的记录后关闭,并且当批处理尝试在下一步中访问它时,它会抛出异常:“无法对关闭结果集进行操作”。
我是春季批次的新手。任何人都可以帮助我如何在阅读器中实现可滚动结果集逻辑。由于内存中的 1M 记录并不是一个好主意。
问候,
我有批处理作业,我从数据库中读取了超过 100 万条记录,并且正在使用 Scrollable Resultset 访问这些记录。现在我正在将该工作转换为春季批次。可滚动的结果集在这种情况下不起作用。我已经尝试过,但是在读取第一个块结果集中的记录后关闭,并且当批处理尝试在下一步中访问它时,它会抛出异常:“无法对关闭结果集进行操作”。
我是春季批次的新手。任何人都可以帮助我如何在阅读器中实现可滚动结果集逻辑。由于内存中的 1M 记录并不是一个好主意。
问候,
一种可能的解决方案是查看Continuable
Tasklet。Tasklet 可以像 Spring Batch 管理的那样“循环”,允许您使用ResultSet.next()
单次访问检索记录并进行处理,而无需框架关闭流。
package de.incompleteco.spring.batch.step.tasklet;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.annotation.Resource;
import javax.sql.DataSource;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
public class ScrollableResultsetTasklet implements Tasklet {
private boolean open = false;
@Resource
private DataSource dataSource;
private String sql = "select * from test_table";
private ResultSet rs;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
if (!open) {
//open the resultset
rs = open();
open = true;//set to open
}//end if
//move
rs.next();
if (!rs.isAfterLast()) {
//show
System.out.println(rs.getInt(1));
return RepeatStatus.CONTINUABLE;
}//end if
//done
return RepeatStatus.FINISHED;
}
protected ResultSet open() throws SQLException {
return dataSource.getConnection().createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY).executeQuery(sql);
}
}
当然,这不是“可重新启动”,但也不是可滚动的结果集(无需一些额外的工作)。
在从块读取的情况下,您可以查看将结果集管理器“外部化”到一个单独的 pogo,它将在批处理块之间保持自己的“状态”。这可能看起来像这样;
package de.incompleteco.spring.batch.service;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.annotation.Resource;
import javax.sql.DataSource;
public class ScrollableResultSetService {
private boolean open = false;
@Resource
private DataSource dataSource;
private String sql = "select * from test_table";
private ResultSet rs;
/**
* retrieve the result set in a 'next' state
* @return
* @throws SQLException
*/
public ResultSet getNext() throws SQLException {
if (rs == null || !open) {
//open the resulset
rs = dataSource.getConnection().createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY).executeQuery(sql);
}//end if
//move
rs.next();
//test
if (rs.isAfterLast()) {
return null;
}//end if
return rs;
}
}
然后 itemreader 看起来有点像这样;
package de.incompleteco.spring.batch.step.item;
import java.sql.ResultSet;
import javax.annotation.Resource;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import de.incompleteco.spring.batch.service.ScrollableResultSetService;
public class ScrollableResultSetItemReader implements ItemReader<T> {
@Resource
private ScrollableResultSetService service;
@Override
public T read() throws Exception, UnexpectedInputException, ParseException,NonTransientResourceException {
ResultSet rs = service.getNext();
if (rs == null) {
return null;//don't continue
}//end if
//process the result set into your object
//...
//return object
return T;
}
}
您可以使用开箱即用的 JdbcPagingItemReader 或 JdbcCursorItemReader。两种都试一下,看看哪一种最适合你。
您的实现将与两者相同..仅更改配置!!!;-)
它们都可以让您读取大量记录,而无需将它们全部放入内存中。
1M 记录与 Spring Batch 无关……如果您使用正确的阅读器;-)
编辑:根据您的评论:
您能否提供现有接口/Impl 的代码示例?
通常,当您想使用现有的服务/Dao 时,可以使用 ItemReaderAdapter。这将让您定义委托 read() 的对象和方法。但是开箱即用的实现非常基础,会在启动时调用您的自定义 service.find() (InitializingBean)。所以你最终会在内存中的 List 中得到 1M 个对象!
因为您的 find 方法可能会运行查询并将结果集映射到 List
我建议您将 SQL 从您的实现类移动到 JdbcCursorItemreader 并使用 stepListener 来验证 jobParameters 并在 JdbcCursorItemreader 中设置正确的 Sql。
问候