我正在开发一个使用 Spring Integration 框架和 atomikos 进行分布式事务构建的项目。最近我们一直在尝试运行集成测试来验证消息是否正确地通过我们的系统发送。在执行其中一个集成测试时,我注意到我们收到 10 条日志消息,指示正在创建新事务,还有 10 条日志消息指示事务提交。
每次消息从通道传递到端点时,Spring 是否都会创建新事务,反之亦然?
下面的代码接受消息驱动通道适配器上的消息(使用事务管理器)并发送到路由器。然后路由器将消息发送到包含转换器、服务激活器(带有 retryAdvice)和出站消息适配器的链。
AFAIK,当我们的消息驱动通道适配器从队列接收消息时,我应该看到一个事务创建,然后在它完成处理消息时提交。所以我认为总共会有 3 笔交易。一种来自在测试中发送消息,一种来自入站适配器,另一种来自在测试中接收消息。
来自 spring-datasource.xml
<bean id="transactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="AtomikosTransactionManager" />
<property name="userTransaction" ref="AtomikosUserTransaction" />
</bean>
来自 spring-context.xml
<jms:message-driven-channel-adapter
id="inventoryQueue_inbound_adapter"
destination-name="queue.inventory"
channel="InventoryRouterChannel"
error-channel="inventoryErrorChannel"
transaction-manager="transactionManager"
acknowledge="transacted"/>
<integration:router input-channel="InventoryRouterChannel">
<bean class="com.inventory.InventoryRouter"/>
</integration:router>
<integration:chain input-channel="rxAddToCountWell">
<integration:json-to-object-transformer type="com.events.RxAddToCountWell"/>
<integration:service-activator ref="addToCountWellHandler" method="formatCountwellMessage">
<integration:request-handler-advice-chain>
<ref bean="retryAdvice"/>
</integration:request-handler-advice-chain>
</integration:service-activator>
<jms:outbound-channel-adapter destination-name="OUTBOUND.QUEUE"/>
</integration:chain>
<bean id="retryAdvice" class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<property name="recoveryCallback">
<bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="inventoryErrorChannel"/>
</bean>
</property>
<property name="retryTemplate">
<bean class="org.springframework.retry.support.RetryTemplate">
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="2"/>
</bean>
</property>
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="1000"/>
<property name="multiplier" value="2"/>
</bean>
</property>
</bean>
</property>
</bean>
消息端点
@MessageEndpoint
public class AddToCountWellHandler {
public static final Logger logger = Logger.getLogger(AddToCountWellHandler.class);
public Message<String> formatCountwellMessage(RxAddToCountWell payload) {
//our logic here
//...
return MessageBuilder.withPayload(temp).build();
}
}
测试方法
@Test
@DirtiesContext
public void addToCountWellIntegrationTest() throws InterruptedException, SystemException, NotSupportedException, HeuristicRollbackException, HeuristicMixedException, RollbackException {
// Send the message to the handler
transactionManager.getTransactionManager().begin();
jmsTemplate.send("queue.inventory", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message;
try {
message = session.createTextMessage(getJson("./doc/event_json_examples/inventory/rxAddToCountWell.json"));
} catch (Exception e) {
//...
}
message.setJMSType("rxAddToCountWell");
return message;
}
});
transactionManager.getTransactionManager().commit();
transactionManager.getTransactionManager().begin();
//verify that it was placed on the queue
TextMessage output = (TextMessage) jmsTemplate.receive(COUNTWELL_QUEUE_NAME);
assertNotNull(output);
transactionManager.getTransactionManager().commit();
appContext.close();
}
日志
2014-04-08 13:55:35,018 [INFO ] org.springframework.transaction.jta.JtaTransactionManager - Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@188c838 (org.springframework.transaction.jta.JtaTransactionManager.checkUserTransactionAndTransactionManager(JtaTransactionManager.java:471))
2014-04-08 13:55:35,019 [INFO ] org.springframework.transaction.jta.JtaTransactionManager - Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@111089b (org.springframework.transaction.jta.JtaTransactionManager.checkUserTransactionAndTransactionManager(JtaTransactionManager.java:482))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,171 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#3]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,171 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:37,619 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:37,631 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:38,656 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:38,658 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:39,667 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:39,670 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:40,705 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:40,706 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:40,748 [DEBUG] com.inventory.InventoryRouter - Routing jms_type: rxAddToCountWell (com.inventory.InventoryRouter.route(InventoryRouter.java:25))
2014-04-08 13:55:40,834 [DEBUG] org.springframework.retry.support.RetryTemplate - Retry: count=0 (org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:251))
2014-04-08 13:55:40,835 [DEBUG] com.inventory.AddToCountWellHandler - Entered formatCountwellMessage... (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:31))
2014-04-08 13:55:41,179 [DEBUG] com.inventory.AddToCountWellHandler - Leaving formatCountwellMessage (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:38))
2014-04-08 13:55:41,190 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:41,193 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#3]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:43,396 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:44,406 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:45,422 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:46,448 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:47,453 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))