3

我正在尝试使用 Spring 和 Hibernate 将数据导入 Oracle 数据库。我对一个神秘的停顿感到困惑,它削弱了我的程序的性能。

这是一个典型的循环。请注意“禁用自动提交”后一分半钟的间隔。

2014-10-23 12:03:20,054 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.j.i.LogicalConnectionImpl - Obtaining JDBC connection
2014-10-23 12:03:20,054 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.j.i.LogicalConnectionImpl - Obtained JDBC connection
2014-10-23 12:03:20,054 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.t.spi.AbstractTransactionImpl - begin
2014-10-23 12:03:20,054 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.t.i.jdbc.JdbcTransaction - initial autocommit status: true
2014-10-23 12:03:20,054 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.t.i.jdbc.JdbcTransaction - disabling autocommit
2014-10-23 12:04:52,759 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.t.spi.AbstractTransactionImpl - committing
2014-10-23 12:04:52,759 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.i.AbstractFlushingEventListener - Processing flush-time cascades
2014-10-23 12:04:52,759 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.i.AbstractFlushingEventListener - Dirty checking collections
2014-10-23 12:04:52,760 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.i.AbstractFlushingEventListener - Flushed: 0 insertions, 0 updates, 0 deletions to 100 objects
2014-10-23 12:04:52,760 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.i.AbstractFlushingEventListener - Flushed: 0 (re)creations, 0 updates, 0 removals to 0 collections
2014-10-23 12:04:52,760 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.internal.util.EntityPrinter - Listing entities:

(大量记录的实体数据被剪断)

2014-10-23 12:04:52,761 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.internal.util.EntityPrinter - More......
2014-10-23 12:04:52,831 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.t.i.jdbc.JdbcTransaction - committed JDBC Connection
2014-10-23 12:04:52,831 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.t.i.jdbc.JdbcTransaction - re-enabling autocommit
2014-10-23 12:04:52,831 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.j.i.LogicalConnectionImpl - Releasing JDBC connection
2014-10-23 12:04:52,831 1.0.0-SNAPSHOT  [ForkJoinPool.commonPool-worker-2] DEBUG o.h.e.j.i.LogicalConnectionImpl - Released JDBC connection

写入数据库的 Java 代码(上面的日志的 BATCH_SIZE == 100):

OracleDao odao = (OracleDao) mainCtx.getBean("oracleDao");
IntStream.rangeClosed(0, (rows.size() - 1) / BATCH_SIZE)
  .mapToObj(i -> {
    int end = (i + 1) * BATCH_SIZE;
    if (end > rows.size()) {
       end = rows.size();
    }
    return rows.subList(i * BATCH_SIZE, end);
  })
  .parallel()
  .forEach(batch -> {
    odao.saveBatch(batch);
  });

OracleDao.saveBatch() 非常简单:

@Transactional
public void saveBatch (List<?> rows) {
  Session session = sessionFactory.getCurrentSession();

  rows.stream().forEach(row -> session.saveOrUpdate(row));

  return;
}

编辑:这是我的应用程序上下文(出于某种原因,将其包含在原始帖子中使 Stack Overflow 认为它是垃圾邮件):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

  <context:component-scan base-package="com.mydomain" />

  <bean id="hikariConfig" class="com.zaxxer.hikari.HikariConfig">
    <property name="dataSourceClassName" value="oracle.jdbc.pool.OracleDataSource" />
    <property name="dataSourceProperties">
      <props>
        <prop key="url">(url)</prop>
        <prop key="user">(user)</prop>
        <prop key="password">(password)</prop>
      </props>
    </property>
  </bean>

  <bean id="oracleDS" class="com.zaxxer.hikari.HikariDataSource" destroy-method="shutdown">
    <constructor-arg ref="hikariConfig" />
  </bean>

  <bean id="sessionFactory" class="org.springframework.orm.hibernate4.LocalSessionFactoryBean">
    <property name="dataSource" ref="oracleDS" />
    <property name="packagesToScan">
      <list>
        <value>com.mydomain.dao</value>
        <value>com.mydomain.data</value>
      </list>
    </property>
    <property name="hibernateProperties">
      <props>
          <prop key="hibernate.dialect">org.hibernate.dialect.Oracle10gDialect</prop>
          <prop key="hibernate.show_sql">false</prop>
          <prop key="hibernate.id.new_generator_mappings">true</prop>
          <prop key="hibernate.connection.autocommit">false</prop>
      </props>
    </property>
  </bean>

  <bean id="jpaAdapter" class="org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter">
    <property name="showSql" value="false" />
    <property name="databasePlatform" value="org.hibernate.dialect.Oracle10gDialect" />
    <property name="database" value="ORACLE" />
    <property name="generateDdl" value="false" />
  </bean>

  <bean id="oracleEMF" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
    <property name="dataSource" ref="oracleDS" />
    <property name="jpaVendorAdapter" ref="jpaAdapter" />
    <property name="packagesToScan">
      <list>
        <value>com.mydomain.dao</value>
        <value>com.mydomain.data</value>
      </list>
    </property>
    <property name="jpaProperties">
      <props>
        <prop key="hibernate.dialect">org.hibernate.dialect.Oracle10gDialect</prop>
        <prop key="hibernate.show_sql">false</prop>
        <prop key="hibernate.connection.autocommit">false</prop>
      </props>
    </property>
  </bean>

  <bean id="transactionManager" class="org.springframework.orm.hibernate4.HibernateTransactionManager">
    <property name="sessionFactory" ref="sessionFactory" />
  </bean>

  <bean id="oracleDao" class="com.mydomain.dao.OracleDao" />

  <tx:annotation-driven />
</beans>

版本:

  • 休眠 4.3.6
  • 春天 4.1.0
  • 光CP 2.1.0
  • Java 8u25

我不认为它正在等待其他线程,因为取出 parallel() 对暂停的长度没有任何影响。

设置 Hibernate 批处理大小而不是使用手动批处理导致日志中根本没有提及批处理。

我错过了什么?

ETA 2:如果我暂停它,这是一个典型的堆栈:

SocketInputStream.socketRead0(FileDescriptor, byte[], int, int, int) line: not available [native method]    
SocketInputStream.read(byte[], int, int, int) line: 150 
SocketInputStream.read(byte[], int, int) line: 121  
DataPacket(Packet).receive() line: 308  
DataPacket.receive() line: 106  
NetInputStream.getNextPacket() line: 324    
NetInputStream.read(byte[], int, int) line: 268 
NetInputStream.read(byte[]) line: 190   
NetInputStream.read() line: 107 
T4CSocketInputStreamWrapper.readNextPacket() line: 124  
T4CSocketInputStreamWrapper.read() line: 80 
T4CMAREngine.unmarshalUB1() line: 1137  
T4C8Oall(T4CTTIfun).receive() line: 350 
T4C8Oall(T4CTTIfun).doRPC() line: 227   
T4C8Oall.doOALL(boolean, boolean, boolean, boolean, boolean, OracleStatement$SqlKind, int, byte[], int, Accessor[], int, Accessor[], int, byte[], char[], short[], int, DBConversion, byte[], InputStream[][], byte[][][], OracleTypeADT[][], OracleStatement, byte[], char[], short[], T4CTTIoac[], int[], int[], int[], NTFDCNRegistration) line: 531 
T4CPreparedStatement.doOall8(boolean, boolean, boolean, boolean, boolean) line: 208 
T4CPreparedStatement.executeForDescribe() line: 886 
T4CPreparedStatement(OracleStatement).executeMaybeDescribe() line: 1175 
T4CPreparedStatement(OracleStatement).doExecuteWithTimeout() line: 1296 
T4CPreparedStatement(OraclePreparedStatement).executeInternal() line: 3613  
T4CPreparedStatement(OraclePreparedStatement).executeQuery() line: 3657 
OraclePreparedStatementWrapper.executeQuery() line: 1495    
PreparedStatementJavassistProxy(PreparedStatementProxy).executeQuery() line: 44 
ResultSetReturnImpl.extract(PreparedStatement) line: 82 
SingleTableEntityPersister(AbstractEntityPersister).getDatabaseSnapshot(Serializable, SessionImplementor) line: 1533    
StatefulPersistenceContext.getDatabaseSnapshot(Serializable, EntityPersister) line: 316 
ForeignKeys.isTransient(String, Object, Boolean, SessionImplementor) line: 255  
DefaultSaveOrUpdateEventListener(AbstractSaveEventListener).getEntityState(Object, String, EntityEntry, SessionImplementor) line: 511   
DefaultSaveOrUpdateEventListener.performSaveOrUpdate(SaveOrUpdateEvent) line: 100   
DefaultSaveOrUpdateEventListener.onSaveOrUpdate(SaveOrUpdateEvent) line: 90 
SessionImpl.fireSaveOrUpdate(SaveOrUpdateEvent) line: 684   
SessionImpl.saveOrUpdate(String, Object) line: 676  
SessionImpl.saveOrUpdate(Object) line: 671  
OracleDao.lambda$0(Session, ?) line: 32 
1717941961.accept(Object) line: not available   
ArrayList$ArrayListSpliterator<E>.forEachRemaining(Consumer<? super E>) line: 1374  
ReferencePipeline$Head<E_IN,E_OUT>.forEach(Consumer<? super E_OUT>) line: 580   
OracleDao.saveBatch(List<?>) line: 32   
OracleDao$$FastClassBySpringCGLIB$$9cbb7611.invoke(int, Object, Object[]) line: not available   
MethodProxy.invoke(Object, Object[]) line: 204  
CglibAopProxy$CglibMethodInvocation.invokeJoinpoint() line: 717 
CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 157 
TransactionInterceptor$1.proceedWithInvocation() line: 98   
TransactionInterceptor(TransactionAspectSupport).invokeWithinTransaction(Method, Class<?>, InvocationCallback) line: 266    
TransactionInterceptor.invoke(MethodInvocation) line: 95    
CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 179 
CglibAopProxy$DynamicAdvisedInterceptor.intercept(Object, Method, Object[], MethodProxy) line: 653  
OracleDao$$EnhancerBySpringCGLIB$$f5d535ab.saveBatch(List) line: not available  
Importer.lambda$9(OracleDao, List<MdbData>) line: 77    
626211770.accept(Object) line: not available    
ForEachOps$ForEachOp$OfRef<T>.accept(T) line: 183   
IntPipeline$4$1.accept(int) line: 250   
Streams$RangeIntSpliterator.forEachRemaining(IntConsumer) line: 110 
Streams$RangeIntSpliterator(Spliterator$OfInt).forEachRemaining(Consumer<Integer>) line: 693    
IntPipeline$4(AbstractPipeline<E_IN,E_OUT,S>).copyInto(Sink<P_IN>, Spliterator<P_IN>) line: 512 
ForEachOps$ForEachTask<S,T>.compute() line: 290 
ForEachOps$ForEachTask<S,T>(CountedCompleter<T>).exec() line: 731   
ForEachOps$ForEachTask<S,T>(ForkJoinTask<V>).doExec() line: 289 
ForkJoinPool$WorkQueue.runTask(ForkJoinTask<?>) line: 902   
ForkJoinPool.scan(ForkJoinPool$WorkQueue, int) line: 1689   
ForkJoinPool.runWorker(ForkJoinPool$WorkQueue) line: 1644   
ForkJoinWorkerThread.run() line: 157    

啊哈,你说,它正在等待阅读,所以它一定是网络问题。但是,使用 sqlldr 从文本文件导入(这是我试图替换的痛苦手动过程的一部分),在同一主机上运行,​​访问同一数据库,运行速度要快几个数量级。所以我不认为这是连接速度的问题。

4

0 回答 0