我正在尝试在 2 个不同的远程 activeMQ 代理之间移动 jms 消息,并且经过大量阅读
我正在使用 Atomikos,因为我正在编写一个独立的应用程序,并且我也在使用 spring 来使整个工作正常。
我有以下 bean javaconfig 设置
@Bean(name="atomikosSrcConnectionFactory")
public AtomikosConnectionFactoryBean consumerXAConnectionFactory() {
AtomikosConnectionFactoryBean consumerBean = new AtomikosConnectionFactoryBean();
consumerBean.setUniqueResourceName("atomikosSrcConnectionFactory");
consumerBean.setLocalTransactionMode(false);
return consumerBean;
}
@Bean(name="atomikosDstConnectionFactory")
public AtomikosConnectionFactoryBean producerXAConnectionFactory() {
AtomikosConnectionFactoryBean producerBean = new AtomikosConnectionFactoryBean();
producerBean.setUniqueResourceName("atomikosDstConnectionFactory");
producerBean.setLocalTransactionMode(false);
return producerBean;
}
@Bean(name="jtaTransactionManager")
public JtaTransactionManager jtaTransactionManager() throws SystemException {
JtaTransactionManager jtaTM = new JtaTransactionManager();
jtaTM.setTransactionManager(userTransactionManager());
jtaTM.setUserTransaction(userTransactionImp());
return jtaTM;
}
@Bean(initMethod="init", destroyMethod="close", name="userTransactionManager")
public UserTransactionManager userTransactionManager() {
UserTransactionManager utm = new UserTransactionManager();
utm.setForceShutdown(false);
return utm;
}
@Bean(name="userTransactionImp")
public UserTransactionImp userTransactionImp() throws SystemException {
UserTransactionImp uti = new UserTransactionImp();
uti.setTransactionTimeout(300);
return uti;
}
@Bean(name="jmsContainer")
@Lazy(value=true)
public DefaultMessageListenerContainer jmsContainer() throws SystemException {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setAutoStartup(false);
dmlc.setTransactionManager(jtaTransactionManager());
dmlc.setSessionTransacted(true);
dmlc.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
dmlc.setConnectionFactory(consumerXAConnectionFactory());
dmlc.setDestinationName("srcQueue");
return dmlc;
}
@Bean(name="transactedJmsTemplate")
public JmsTemplate transactedJmsTemplate() {
DynamicDestinationResolver dest = new DynamicDestinationResolver();
JmsTemplate jmsTmp = new JmsTemplate(producerXAConnectionFactory());
jmsTmp.setDeliveryPersistent(true);
jmsTmp.setSessionTransacted(true);
jmsTmp.setDestinationResolver(dest);
jmsTmp.setPubSubDomain(false);
jmsTmp.setReceiveTimeout(20000);
jmsTmp.setExplicitQosEnabled(true);
jmsTmp.setSessionTransacted(true);
jmsTmp.setDefaultDestination(new ActiveMQQueue("destQueue"));
jmsTmp.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return jmsTmp;
}
在我启动 DMLC 之前,2 AtomikosConnectionFactoryBean 在运行时包装了一个 ActiveMQXAConnectionFactory(每个代理一个)。
然后,我使用以下方法设置了一个简单的 messageListener(在 dmlc 启动之前分配给它):
public void onMessage(Message message) {
final Message rcvedMsg = message;
try{
MessageCreator msgCreator = new MessageCreator(){
public Message createMessage(Session session) throws JMSException{
Message returnMsg = null;
if(rcvedMsg instanceof TextMessage){
TextMessage txtMsg = session.createTextMessage();
txtMsg.setText(((TextMessage) rcvedMsg).getText());
returnMsg = txtMsg;
}
else if(rcvedMsg instanceof BytesMessage){
BytesMessage bytesMsg = session.createBytesMessage();
if(!(((BytesMessage) rcvedMsg).getBodyLength() > Integer.MAX_VALUE)){
byte[] bodyContent = new byte[(int) ((BytesMessage) rcvedMsg).getBodyLength()];
bytesMsg.writeBytes(bodyContent);
returnMsg = bytesMsg;
}
}
return returnMsg;
}
};
jmsTemplate.send(msgCreator);
}
catch(JmsException | JMSException e){
logger.error("Error when transfering message: '{}'. {}",message,e);
}
}
应用程序在没有任何特定错误或警告的情况下启动,但是一旦我将消息放入源队列中,我可以通过日志看到 onMessage 方法正在一遍又一遍地运行同一条消息,就好像事务一直在回滚并再次重新启动(不会在任何地方抛出错误)。
我还注意到,如果我碰巧使用相同的源 url 和目标 url(意味着相同的代理,但每个代理都有自己的 connectionFactory),它会工作并且消息会按预期在源队列和目标队列之间传输。
我想知道的是
- 我在设置中做错了什么?为什么我的事务“似乎”在使用 2 个不同的代理时一遍又一遍地回滚,但在使用相同的代理(但超过 2 个不同的连接工厂)时工作?
- 我不完全相信 onMessage 当前正在执行正确的事务,因为我目前正在捕获所有异常并且什么都不做,我相信这将在 jmstemplate 完成发送消息之前提交 dmlc 的事务,但我不确定。如果是这种情况,SessionAwareMessageListener 会更好吗?我应该在 onMessage 方法中设置@Transacted 吗?
任何人都可以帮助阐明这个问题吗?欢迎所有输入。
更新:
我意识到“回滚”的问题是由于我使用的两个 AMQ 都是通过代理网络相互连接的,而且我碰巧对源和目标使用了相同的队列名称。这导致应用程序将消息从一个 AMQ 传输到另一个 AMQ,然后立即,因为源 AMQ 上有一个消费者,消息将被传输回原始 AMQ,而原始 AMQ 又被视为我的应用程序发出新消息并再次传输,循环无限进行。下面发布的解决方案有助于解决其他问题。