0

我有一个 Spring 配置的 Web 应用程序,它通过以下侦听器接收 JMS 消息:

public class EntityPersister implements MessageListener {

    @Resource
    private EntityManager entityManager;

    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            Object entity = createEntity(textMessage);
            entityManager.persist(entity);
            entityManager.flush(); //for debugging only
        }
    }
}

当我在我的应用程序中执行此侦听器时,我NoTransactionException从行中得到一个entityManager.flush().

我需要配置什么才能让实体管理器参与到已经存在的 JTA 事务中?

我已经尝试@Transactional过上述实现,但没有成功。

ActiveMQ 用作 JMS 提供程序。弹簧配置为:

<bean id="jmsConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName" value="atomikos-activemq" />
    <property name="xaConnectionFactory">
        <!-- ActiveMQ wird als JMS Provider genutzt -->
        <bean id="activeMQXAConnectionFactory"
            class="org.apache.activemq.spring.ActiveMQXAConnectionFactory">
            <property name="brokerURL">
                <value>tcp://localhost:61616</value>
            </property>
        </bean>
    </property>
    <property name="maxPoolSize" value="2" />
    <property name="localTransactionMode" value="false" />
</bean>

<bean id="entityPersister" class="EntityPersister" />

<bean id="jmsContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="destinationName" ref="entityDestinationName" />
    <property name="messageListener" ref="entityPersister" />
    <property name="sessionTransacted" value="true" />
    <property name="transactionManager" ref="txManager" />
</bean>

OpenJPA 用作 JPA 提供者。持久性单位是:

<persistence-unit name="somePU" transaction-type="JTA">
    <jta-data-source>managedDataSource</jta-data-source>
    <non-jta-data-source>nonManagedDataSource</non-jta-data-source>
    <!-- some entity class listed here -->
    <properties>
        <property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema(ForeignKeys=true)" />
    </properties>
</persistence-unit>

弹簧配置为:

<!-- Construct Atomikos UserTransactionManager, needed to configure Spring -->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
    init-method="init" destroy-method="close">
    <!-- when close is called, should we force transactions to terminate or 
        not? -->
    <property name="forceShutdown" value="true" />
</bean>

<!-- Also use Atomikos UserTransactionImp, needed to configure Spring -->
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
    <property name="transactionTimeout" value="300" />
</bean>

<!-- Configure the Spring framework to use JTA transactions from Atomikos -->
<bean id="txManager"
    class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="atomikosTransactionManager" />
    <property name="userTransaction" ref="atomikosUserTransaction" />
</bean>

<!-- enable the configuration of transactional behavior based on annotations -->
<tx:annotation-driven transaction-manager="txManager" />

<bean id="entityManagerFactory"
    class="org.springframework.orm.jpa.LocalEntityManagerFactoryBean">
    <property name="persistenceUnitName" value="somePU" />
    <property name="jpaPropertyMap">
        <map>
            <entry key="openjpa.ManagedRuntime" value="jndi" />
        </map>
    </property>
</bean>

<bean id="entityManager" factory-bean="entityManagerFactory"
    factory-method="createEntityManager" />

OpenJPA 通过 JNDI 从持久性单元中查找 XA 和非 XA 数据源。

4

1 回答 1

2

如果您正在使用setMessageListener(),则消息接收在onMessage单独的线程中异步发生。

注入entityManager发生在不同的线程中。通常,事务是线程绑定的。因此,在消息接收开始之前,注入线程可能已经完成了它的工作,并且它的关联事务已经关闭。即使该事务仍然存在,if 也不会被加入到消息接收线程使用的同一全局事务中。

您可以通过显式创建/获取entityManagerin来验证它onMessage。这样,所有 XA 事务感知资源都应该在同一个全局事务中登记,因为所有这些资源都是从同一个线程打开的。

于 2013-09-02T13:08:45.290 回答