5

我有一个 Spring MVC 应用程序。我想使用 spring ampq 来执行我的异步任务。我的 REST 后端使用 @Transactional 注释来管理事务。完成工作后,它会将任务推送到交易所,然后返回。然后任务被消费者接收,它在第一次尝试时收到“org.hibernate.HibernateException: No Session found for current thread”错误。然后 spring-ampq 重试将任务发送给消费者,这一次它可以工作,没有任何会话错误。

我怎样才能使这种情况在第一次尝试中起作用?

我的 spring ampq 配置与此类似(http://projects.spring.io/spring-amqp/):

<rabbit:connection-factory id="connectionFactory" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
    exchange="myExchange" routing-key="foo.bar"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="myQueue" />

<rabbit:topic-exchange name="myExchange">
    <rabbit:bindings>
        <rabbit:binding queue="myQueue" pattern="foo.*" />
    </rabbit:bindings>
</rabbit:topic-exchange>


<rabbit:listener-container connection-factory="connectionFactory"  advice-chain="retryInterceptor">
    <rabbit:listener ref="foo" method="listen" queue-names="myQueue" />
</rabbit:listener-container>

<bean id="foo" class="foo.Foo" />

<bean id="retryInterceptor" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
    <property name="retryOperations" ref="retryTemplate"/>
    <property name="messageKeyGeneretor" ref="bodyBasedKeyGenerator"/>
</bean>
<bean id="bodyBasedKeyGenerator" class="com.mydomain.util.BodyBasedKeyGenerator"/>
<bean id="rejectAndDontRequeueRecoverer" class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="5000"/>
            <property name="maxInterval" value="120000"/>
            <property name="multiplier" value="2"/>
        </bean>
    </property>
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="1"/>
        </bean>
    </property>
</bean>          

注意: foo.Foo listen() 方法也用@Transactional 注释。

我的事务管理器配置如下:

<bean id="mySessionFactory" class="org.springframework.orm.hibernate4.LocalSessionFactoryBean">
    <property name="dataSource" ref="myDataSource"/>
    <property name="packagesToScan" value="com.mydomain.bean"/>
    <property name="hibernateProperties">
        <props>
            <prop key="hibernate.dialect">org.hibernate.spatial.dialect.postgis.PostgisDialect
            </prop>
            <prop key="hibernate.show_sql">false</prop>
            <prop key="hibernate.hbm2ddl.auto">update</prop>
        </props>
    </property>
</bean>

<bean id="transactionManager" class="org.springframework.orm.hibernate4.HibernateTransactionManager">
    <property name="sessionFactory" ref="mySessionFactory"/>
</bean>

<tx:annotation-driven transaction-manager="transactionManager"/>

注意:会话工厂是在 dao 工厂深处初始化的。我在 REST 服务和兔子消费者中使用相同的 dao 工厂。

我的错误日志如下:

06:48:16,141 DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Storing delivery for Consumer: tag=[amq.ctag-K3ZRHRBVttW20ROENd9l4g], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@192.168.1.21:5672/,3), acknowledgeMode=AUTO local queue size=0
06:48:16,142 DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'73'; ID:null; Content:application/x-java-serialized-object; Headers:{}; Exchange:exchange; RoutingKey:foo.dummy.dummy2; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:1)
06:48:16,143 DEBUG [org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor] - Executing proxied method in stateful retry: public abstract void org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$ContainerDelegate.invokeListener(com.rabbitmq.client.Channel,org.springframework.amqp.core.Message) throws java.lang.Exception(2fe83585)
06:48:16,146 TRACE [org.springframework.retry.support.RetryTemplate] - RetryContext retrieved: [RetryContext: count=0, lastException=null, exhausted=false]
06:48:16,146 DEBUG [org.springframework.retry.support.RetryTemplate] - Retry: count=0
06:48:16,147 DEBUG [org.springframework.beans.factory.support.DefaultListableBeanFactory] - Returning cached instance of singleton bean 'transactionManager'
06:48:16,147 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Creating new transaction with name [foo.Foo.listen]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; ''
06:48:16,147 TRACE [org.hibernate.internal.SessionImpl] - Opened session at timestamp: 13881430961
06:48:16,147 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Opened new Session [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=[] updates=[] deletions=[] collectionCreations=[] collectionRemovals=[] collectionUpdates=[] unresolvedInsertDependencies=UnresolvedEntityInsertActions[]])] for Hibernate transaction
06:48:16,147 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Preparing JDBC Connection of Hibernate Session [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=[] updates=[] deletions=[] collectionCreations=[] collectionRemovals=[] collectionUpdates=[] unresolvedInsertDependencies=UnresolvedEntityInsertActions[]])]
06:48:16,147 DEBUG [org.hibernate.engine.transaction.spi.AbstractTransactionImpl] - begin
06:48:16,147 DEBUG [org.hibernate.engine.jdbc.internal.LogicalConnectionImpl] - Obtaining JDBC connection
06:48:16,147 DEBUG [org.hibernate.engine.jdbc.internal.LogicalConnectionImpl] - Obtained JDBC connection
06:48:16,147 DEBUG [org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction] - initial autocommit status: true
06:48:16,147 DEBUG [org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction] - disabling autocommit
06:48:16,147 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Exposing Hibernate transaction as JDBC transaction [org.hibernate.engine.jdbc.internal.proxy.ConnectionProxyHandler@60c58418[valid=true]]
06:48:16,147 TRACE [org.springframework.transaction.support.TransactionSynchronizationManager] - Bound value [org.springframework.jdbc.datasource.ConnectionHolder@52a971e3] for key [org.apache.commons.dbcp.BasicDataSource@3c250cce] to thread [SimpleAsyncTaskExecutor-1]
06:48:16,147 TRACE [org.springframework.transaction.support.TransactionSynchronizationManager] - Bound value [org.springframework.orm.hibernate4.SessionHolder@7274187a] for key [org.hibernate.internal.SessionFactoryImpl@68e26d2e] to thread [SimpleAsyncTaskExecutor-1]
06:48:16,147 TRACE [org.springframework.transaction.support.TransactionSynchronizationManager] - Initializing transaction synchronization
06:48:16,147 TRACE [org.springframework.transaction.interceptor.TransactionInterceptor] - Getting transaction for [foo.Foo.listen]
06:48:16,153 DEBUG [org.springframework.beans.factory.support.DefaultListableBeanFactory] - Returning cached instance of singleton bean 'org.springframework.cache.interceptor.CacheInterceptor#0'
06:48:16,153 TRACE [org.hibernate.internal.SessionImpl] - Opened session at timestamp: 13881430961
06:48:16,154 TRACE [org.springframework.transaction.interceptor.TransactionInterceptor] - Completing transaction for [foo.Foo.listen] after exception: org.hibernate.HibernateException: No Session found for current thread
06:48:16,154 TRACE [org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] - Applying rules to determine whether transaction should rollback on org.hibernate.HibernateException: No Session found for current thread
06:48:16,154 TRACE [org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] - Winning rollback rule is: null
06:48:16,154 TRACE [org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] - No relevant rollback rule found: applying default rules
06:48:16,154 TRACE [org.springframework.orm.hibernate4.HibernateTransactionManager] - Triggering beforeCompletion synchronization
06:48:16,154 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Initiating transaction rollback
06:48:16,154 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Rolling back Hibernate transaction on Session [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=[] updates=[] deletions=[] collectionCreations=[] collectionRemovals=[] collectionUpdates=[] unresolvedInsertDependencies=UnresolvedEntityInsertActions[]])]
06:48:16,154 DEBUG [org.hibernate.engine.transaction.spi.AbstractTransactionImpl] - rolling back
06:48:16,154 DEBUG [org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction] - rolled JDBC Connection
06:48:16,154 DEBUG [org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction] - re-enabling autocommit
06:48:16,154 TRACE [org.hibernate.engine.transaction.internal.TransactionCoordinatorImpl] - after transaction completion
06:48:16,154 TRACE [org.hibernate.internal.SessionImpl] - after transaction completion
06:48:16,154 TRACE [org.springframework.orm.hibernate4.HibernateTransactionManager] - Triggering afterCompletion synchronization
06:48:16,154 TRACE [org.springframework.transaction.support.TransactionSynchronizationManager] - Clearing transaction synchronization
06:48:16,154 TRACE [org.springframework.transaction.support.TransactionSynchronizationManager] - Removed value [org.springframework.orm.hibernate4.SessionHolder@7274187a] for key [org.hibernate.internal.SessionFactoryImpl@68e26d2e] from thread [SimpleAsyncTaskExecutor-1]
06:48:16,154 TRACE [org.springframework.transaction.support.TransactionSynchronizationManager] - Removed value [org.springframework.jdbc.datasource.ConnectionHolder@52a971e3] for key [org.apache.commons.dbcp.BasicDataSource@3c250cce] from thread [SimpleAsyncTaskExecutor-1]
06:48:16,154 TRACE [org.hibernate.engine.jdbc.internal.proxy.ConnectionProxyHandler] - Handling invocation of connection method [isReadOnly]
06:48:16,154 DEBUG [org.springframework.orm.hibernate4.HibernateTransactionManager] - Closing Hibernate Session [SessionImpl(PersistenceContext[entityKeys=[],collectionKeys=[]];ActionQueue[insertions=[] updates=[] deletions=[] collectionCreations=[] collectionRemovals=[] collectionUpdates=[] unresolvedInsertDependencies=UnresolvedEntityInsertActions[]])] after transaction
06:48:16,154 TRACE [org.hibernate.internal.SessionImpl] - Closing session
06:48:16,154 TRACE [org.hibernate.engine.jdbc.internal.LogicalConnectionImpl] - Closing logical connection
06:48:16,154 TRACE [org.hibernate.engine.jdbc.internal.JdbcResourceRegistryImpl] - Closing JDBC container [org.hibernate.engine.jdbc.internal.JdbcResourceRegistryImpl@5ef23a26]
06:48:16,154 DEBUG [org.hibernate.engine.jdbc.internal.LogicalConnectionImpl] - Releasing JDBC connection
06:48:16,154 DEBUG [org.hibernate.engine.jdbc.internal.LogicalConnectionImpl] - Released JDBC connection
06:48:16,154 DEBUG [org.hibernate.engine.jdbc.internal.proxy.ConnectionProxyHandler] - HHH000163: Logical connection releasing its physical connection
06:48:16,154 DEBUG [org.hibernate.engine.jdbc.internal.proxy.ConnectionProxyHandler] - HHH000163: Logical connection releasing its physical connection
06:48:16,154 DEBUG [org.hibernate.engine.jdbc.internal.proxy.ConnectionProxyHandler] - HHH000163: Logical connection releasing its physical connection
06:48:16,154 TRACE [org.hibernate.engine.jdbc.internal.LogicalConnectionImpl] - Logical connection closed
06:48:16,155 DEBUG [org.springframework.retry.support.RetryTemplate] - Checking for rethrow: count=1
06:48:16,155 DEBUG [org.springframework.retry.support.RetryTemplate] - Rethrow in retry for policy: count=1
org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener method 'listen' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:457)
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:358)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:546)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:61)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:110)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor$MethodInvocationRetryCallback.doWithRetry(StatefulRetryOperationsInterceptor.java:173)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:255)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:188)
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:145)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
    at $Proxy51.invokeListener(Unknown Source)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:611)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:454)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:458)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:61)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:551)
    at java.lang.Thread.run(Thread.java:662)
Caused by: org.hibernate.HibernateException: No Session found for current thread
    at org.springframework.orm.hibernate4.SpringSessionContext.currentSession(SpringSessionContext.java:97)
    at org.hibernate.internal.SessionFactoryImpl.getCurrentSession(SessionFactoryImpl.java:978)
    at com.mydomain.dao.ListingDaoImpl.getListing(ListingDaoImpl.java:38)
    at com.mydomain.dataaccess.DataProvider.getListing(DataProvider.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:198)
    at $Proxy29.getListing(Unknown Source)
    at foo.Foo.listen(Foo.java:32)
    at foo.Foo$$FastClassByCGLIB$$1e4b772c.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:698)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:631)
    at foo.Foo$$EnhancerByCGLIB$$714aa208.listen(<generated>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:273)
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:451)
    ... 26 more

监听代码如下:

package foo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;

@Transactional(readOnly = true)
public class Foo {

    @Autowired
    private IDataProvider dataProvider;

    @Transactional
    public void listen(Long listingId) throws Exception {

        dataProvider.getListing(listingId);
    }
}

REST 后端的代码与此类似:

@Controller
@Transactional(readOnly = true)
@RequestMapping(value = { "/myController" })
public class MyController extends BaseController {

    @Autowired
    private RabbitTemplate exchangeTemplate;

    @Transactional(readOnly = false)
    @RequestMapping(method = RequestMethod.POST)
    public @ResponseBody MyClass update(@RequestBody MyClass myclass, HttpServletRequest request) throws Exception {
        getAuthorizationManager().authorizeUserFromRequest(request);
        List<MyClass> result = getDataProvider().update(myclass);
        exchangeTemplate.convertAndSend("foo.Foo.update", myclass.getId())
        return result;
    }   
}
4

2 回答 2

4

好的。现在我知道哪里出了问题。你正在使用LocalSessionFactoryBean,谁基于SpringSessionContext,谁又想SessionHolder在交易资源中拥有一个ThreadLocal。但就 AMQP 监听器在他自己的线程中工作而言,没有任何钩子sessionFactory在事务资源中注册。类似于OpenSessionInViewFilterSpring MVC 的东西。

尝试使用

hibernate.current_session_context_class = local

至于。hibernateProperties_ LocalSessionFactoryBean我从来没有用过LocalSessionFactoryBean。来自 EE 容器 JNDI的 JTASessionFactory始终适用于所有情况。

更新:

好吧,就您可以IDataProvider从 MVC 和 AMQP 侦听器中使用相同的内容而言,问题在于CurrentSessionContext,在 AMQP 线程中没有的地方currentSession,您必须sessionFactory.openSession()从您的实现代码中手动IDataProvider执行,并且仅以防万一,如果有currentSession内没有currentSessionContext。像这样的东西:

Session hibernateSession = null;
try {
   hibernateSession = this.sessionFactory.getCurrentSession();
}
catch(HibernateException he) {
    hibernateSession = this.sessionFactory.openSession();
}

当然,您可以编写您的 onwSpringSessionContext变体并将该代码移动到currentSession()

我不明白为什么它会为您重试第二次尝试。在我的测试用例中,它无论如何都会失败。

假设你的配置中有一些东西,它关心currentSession,但你没有提到它。

高温高压

于 2013-12-28T15:02:11.423 回答
2

The problem turned out to be in the initialization of sessionfactory. I had a static getDao function and the SessionFactory instance was also static:

@Repository
public class DaoFactory {

    private static SessionFactory sessionFactory;

    public SessionFactory getSessionFactory() {
        return sessionFactory;
    }

    @Autowired
    public void setSessionFactory(SessionFactory sessionFactory) {
        DaoFactory.sessionFactory = sessionFactory;
    }

    public static HibernateDaoBase getDao(Class<?> className) {
     .
     .

I got it working by making the getDao function non-static and autowiring the DaoFactory class into the DataProvider class:

@Repository
public class DaoFactory {

    @Autowired
    private SessionFactory sessionFactory;

    public HibernateDaoBase getDao(Class<?> className) {
     .
     .

and

@Repository
@EnableCaching
public class DataProvider implements IDataProvider {

    @Autowired
    private DaoFactory daoFactory;
于 2014-01-04T15:05:08.893 回答