2

我正在编写一个使用 Spring Batch 处理 MySQL 数据库表中的 7,637,064 行的程序。我在较小的表上取得了成功,但是当 JdbcCursorItemReader 尝试打开游标时,此表中的大量行导致 OutOfMemoryError 异常。

我可能可以通过向它扔一个更大的 Xmx 来解决这个问题,但在我看来,Spring Batch 应该有一种方法来处理这个问题,而且我可能只是缺少一个关键的配置。

Spring批处理配置:

  <job id="reportJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="largeTableTransfer">
      <tasklet>
        <chunk reader="largeTableReader" processor="largeTableTransformer" writer="largeTableWriter"
          commit-interval="10" />
      </tasklet>
    </step>
  </job>

  <bean id="largeTableReader" class="org.springframework.batch.item.database.JdbcCursorItemReader">
    <property name="dataSource" ref="inputDataSource" />
    <property name="sql" value="select * from largeTable" />
    <property name="rowMapper">
      <bean class="myproject.reader.largeTableRowMapper" />
    </property>
  </bean>
  <bean id="largeTableTransformer" class="myproject.transformer.LargeTableTransformer" />
  <bean id="largeTableWriter" class="myproject.writer.JdbcLargeTableWriter">
    <property name="dataSource" ref="outputDataSource" />
  </bean>

在 JdbcCursorItemReader 上设置fetchSize似乎没有效果。唯一允许它运行完成的是将maxRows设置为一个较小的数字,但随后只会处理该数量的行。

相关的堆栈跟踪:

2012-11-21 11:25:29,931 DEBUG [org.springframework.batch.core.repository.dao.JdbcStepExecutionDao] - <Truncating long message before update of StepExecution, original message is: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2734)
    at java.util.ArrayList.ensureCapacity(ArrayList.java:167)
    at java.util.ArrayList.add(ArrayList.java:351)
    at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:2821)
    at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:467)
    at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:2510)
    at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:1746)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2135)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2542)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1734)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1885)
    at org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:93)
    at org.springframework.batch.item.database.JdbcCursorItemReader.openCursor(JdbcCursorItemReader.java:125)
    at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:401)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:134)
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:93)
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:301)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:192)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135)
    at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61)
    at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60)
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144)
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124)
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:293)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114)
    at ifpress.ams2amx.ExampleJobConfigurationTests.testLaunchJob(ExampleJobConfigurationTests.java:32)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
4

1 回答 1

3

MySql JDBC 驱动程序中存在一个问题,导致将整个数据集加载到内存中,而不管您传递给语句创建方法的参数如何。有关如何正确打开游标,请参阅http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html

这是我的做法:

<bean class="org.springframework.batch.item.database.JdbcCursorItemReader">
  <property name="verifyCursorPosition" value="false" />
   <property name="dataSource" ref="remoteDataSource" />
   <property name="rowMapper">
     <bean class="org.springframework.jdbc.core.SingleColumnRowMapper" />
   </property>
   <property name="fetchSize">
     <util:constant static-field="java.lang.Integer.MIN_VALUE" />
   </property>
   <property name="sql">
     <value>SELECT foo, bar FROM baz</value>
  </property>
</bean>
于 2012-11-22T00:50:13.123 回答