1

我正在开发一个使用 Spring Integration 框架和 atomikos 进行分布式事务构建的项目。最近我们一直在尝试运行集成测试来验证消息是否正确地通过我们的系统发送。在执行其中一个集成测试时,我注意到我们收到 10 条日志消息,指示正在创建新事务,还有 10 条日志消息指示事务提交。

每次消息从通道传递到端点时,Spring 是否都会创建新事务,反之亦然?

下面的代码接受消息驱动通道适配器上的消息(使用事务管理器)并发送到路由器。然后路由器将消息发送到包含转换器、服务激活器(带有 retryAdvice)和出站消息适配器的链。

AFAIK,当我们的消息驱动通道适配器从队列接收消息时,我应该看到一个事务创建,然后在它完成处理消息时提交。所以我认为总共会有 3 笔交易。一种来自在测试中发送消息,一种来自入站适配器,另一种来自在测试中接收消息。

来自 spring-datasource.xml

<bean id="transactionManager"
      class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="AtomikosTransactionManager" />
    <property name="userTransaction" ref="AtomikosUserTransaction" />
</bean>

来自 spring-context.xml

<jms:message-driven-channel-adapter
        id="inventoryQueue_inbound_adapter"
        destination-name="queue.inventory"
        channel="InventoryRouterChannel"
        error-channel="inventoryErrorChannel"
        transaction-manager="transactionManager"
        acknowledge="transacted"/>

<integration:router input-channel="InventoryRouterChannel">
    <bean class="com.inventory.InventoryRouter"/>
</integration:router>

<integration:chain input-channel="rxAddToCountWell">
    <integration:json-to-object-transformer type="com.events.RxAddToCountWell"/>
    <integration:service-activator ref="addToCountWellHandler" method="formatCountwellMessage">
        <integration:request-handler-advice-chain>
            <ref bean="retryAdvice"/>
        </integration:request-handler-advice-chain>
    </integration:service-activator>
    <jms:outbound-channel-adapter destination-name="OUTBOUND.QUEUE"/>
</integration:chain>

<bean id="retryAdvice" class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
    <property name="recoveryCallback">
        <bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
            <constructor-arg ref="inventoryErrorChannel"/>
        </bean>
    </property>
    <property name="retryTemplate">
        <bean class="org.springframework.retry.support.RetryTemplate">
            <property name="retryPolicy">
                <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                    <property name="maxAttempts" value="2"/>
                </bean>
            </property>
            <property name="backOffPolicy">
                <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                    <property name="initialInterval" value="1000"/>
                    <property name="multiplier" value="2"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

消息端点

@MessageEndpoint
public class AddToCountWellHandler {

    public static final Logger logger = Logger.getLogger(AddToCountWellHandler.class);

    public Message<String> formatCountwellMessage(RxAddToCountWell payload) {
        //our logic here
        //...
        return MessageBuilder.withPayload(temp).build();
    }
}

测试方法

@Test
@DirtiesContext
public void addToCountWellIntegrationTest() throws InterruptedException, SystemException, NotSupportedException, HeuristicRollbackException, HeuristicMixedException, RollbackException {

    // Send the message to the handler
    transactionManager.getTransactionManager().begin();
    jmsTemplate.send("queue.inventory", new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            TextMessage message;
            try {
                message = session.createTextMessage(getJson("./doc/event_json_examples/inventory/rxAddToCountWell.json"));
            } catch (Exception e) {
                //...
            }
            message.setJMSType("rxAddToCountWell");
            return message;
        }
    });
    transactionManager.getTransactionManager().commit();

    transactionManager.getTransactionManager().begin();

    //verify that it was placed on the queue
    TextMessage output = (TextMessage) jmsTemplate.receive(COUNTWELL_QUEUE_NAME);
    assertNotNull(output);

    transactionManager.getTransactionManager().commit();
    appContext.close();
}

日志

2014-04-08 13:55:35,018 [INFO ] org.springframework.transaction.jta.JtaTransactionManager - Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@188c838 (org.springframework.transaction.jta.JtaTransactionManager.checkUserTransactionAndTransactionManager(JtaTransactionManager.java:471))
2014-04-08 13:55:35,019 [INFO ] org.springframework.transaction.jta.JtaTransactionManager - Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@111089b (org.springframework.transaction.jta.JtaTransactionManager.checkUserTransactionAndTransactionManager(JtaTransactionManager.java:482))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,171 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#3]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,171 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:37,619 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:37,631 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:38,656 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:38,658 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:39,667 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:39,670 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:40,705 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:40,706 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:40,748 [DEBUG] com.inventory.InventoryRouter - Routing jms_type: rxAddToCountWell (com.inventory.InventoryRouter.route(InventoryRouter.java:25))
2014-04-08 13:55:40,834 [DEBUG] org.springframework.retry.support.RetryTemplate - Retry: count=0 (org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:251))
2014-04-08 13:55:40,835 [DEBUG] com.inventory.AddToCountWellHandler - Entered formatCountwellMessage... (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:31))
2014-04-08 13:55:41,179 [DEBUG] com.inventory.AddToCountWellHandler - Leaving formatCountwellMessage (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:38))
2014-04-08 13:55:41,190 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:41,193 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#3]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:43,396 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:44,406 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:45,422 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:46,448 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:47,453 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
4

2 回答 2

1

我已经确定发生了什么。当我加载我的测试上下文配置时,我正在加载一个导入所有 5 个消息驱动通道适配器的文件。正如 Gary 所说,每个空闲线程都必须启动一个事务。所以适配器都启动了事务,但只有一个实际收到了消息。

我发现修改我的测试上下文以仅加载特定的适配器/路由导致仅使用单个线程。

2014-04-08 17:16:38,322 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 17:16:38,492 [DEBUG] com.inventory.InventoryRouter - Routing jms_type: rxAddToCountWell (com.inventory.InventoryRouter.route(InventoryRouter.java:25))
2014-04-08 17:16:38,555 [DEBUG] org.springframework.retry.support.RetryTemplate - Retry: count=0 (org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:251))
2014-04-08 17:16:38,595 [DEBUG] com.inventory.AddToCountWellHandler - Entered formatCountwellMessage... (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:31))
2014-04-08 17:16:42,198 [DEBUG] com.inventory.AddToCountWellHandler - Leaving formatCountwellMessage (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:38))
2014-04-08 17:16:42,205 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 17:16:42,210 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 17:16:43,219 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
于 2014-04-08T22:27:54.673 回答
0

它与您的配置不匹配,但看起来您有 5 个侦听器线程 ( #0..#4)。其中 4 个在处理消息时执行无操作事务(不接收消息)#3。空闲线程必须在接收“以防”有消息之前启动事务。

由于没有在这些线程上完成任何工作,因此提交应该是轻量级的;您可以增加接收超时以最小化这种情况,但代价是关闭容器可能需要更长的时间(因为线程在 JMS 客户端库中被阻塞)。

默认receiveTimeout值为 1 秒。

于 2014-04-08T20:03:39.580 回答