0

我正在尝试在 2 个不同的远程 activeMQ 代理之间移动 jms 消息,并且经过大量阅读

我正在使用 Atomikos,因为我正在编写一个独立的应用程序,并且我也在使用 spring 来使整个工作正常。

我有以下 bean javaconfig 设置

@Bean(name="atomikosSrcConnectionFactory")
    public AtomikosConnectionFactoryBean consumerXAConnectionFactory() {
        AtomikosConnectionFactoryBean consumerBean = new AtomikosConnectionFactoryBean();
        consumerBean.setUniqueResourceName("atomikosSrcConnectionFactory");
        consumerBean.setLocalTransactionMode(false);
        return consumerBean;
    }

    @Bean(name="atomikosDstConnectionFactory")
    public AtomikosConnectionFactoryBean producerXAConnectionFactory() {
        AtomikosConnectionFactoryBean producerBean = new AtomikosConnectionFactoryBean();
        producerBean.setUniqueResourceName("atomikosDstConnectionFactory");
        producerBean.setLocalTransactionMode(false);
        return producerBean;
    }

    @Bean(name="jtaTransactionManager")
    public JtaTransactionManager jtaTransactionManager() throws SystemException {
        JtaTransactionManager jtaTM = new JtaTransactionManager();
        jtaTM.setTransactionManager(userTransactionManager());
        jtaTM.setUserTransaction(userTransactionImp());
        return jtaTM;
    }

    @Bean(initMethod="init", destroyMethod="close", name="userTransactionManager")
    public UserTransactionManager userTransactionManager() {
        UserTransactionManager utm = new UserTransactionManager();
        utm.setForceShutdown(false);
        return utm;
    }

    @Bean(name="userTransactionImp")
    public UserTransactionImp userTransactionImp() throws SystemException {
        UserTransactionImp uti = new UserTransactionImp();
        uti.setTransactionTimeout(300);
        return uti;
    }

    @Bean(name="jmsContainer")
    @Lazy(value=true)
    public DefaultMessageListenerContainer jmsContainer() throws SystemException {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setAutoStartup(false);
        dmlc.setTransactionManager(jtaTransactionManager());
        dmlc.setSessionTransacted(true);
        dmlc.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        dmlc.setConnectionFactory(consumerXAConnectionFactory());
        dmlc.setDestinationName("srcQueue");
        return dmlc;
    }

    @Bean(name="transactedJmsTemplate")
    public JmsTemplate transactedJmsTemplate() {

        DynamicDestinationResolver dest = new DynamicDestinationResolver();

        JmsTemplate jmsTmp = new JmsTemplate(producerXAConnectionFactory());

        jmsTmp.setDeliveryPersistent(true);
        jmsTmp.setSessionTransacted(true);
        jmsTmp.setDestinationResolver(dest);
        jmsTmp.setPubSubDomain(false);
        jmsTmp.setReceiveTimeout(20000);
        jmsTmp.setExplicitQosEnabled(true);
        jmsTmp.setSessionTransacted(true);
        jmsTmp.setDefaultDestination(new ActiveMQQueue("destQueue"));
        jmsTmp.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

        return jmsTmp;
    }

在我启动 DMLC 之前,2 AtomikosConnectionFactoryBean 在运行时包装了一个 ActiveMQXAConnectionFactory(每个代理一个)。

然后,我使用以下方法设置了一个简单的 messageListener(在 dmlc 启动之前分配给它):

public void onMessage(Message message) {
    final Message rcvedMsg = message;

    try{
        MessageCreator msgCreator = new MessageCreator(){
                public Message createMessage(Session session) throws JMSException{
                    Message returnMsg = null;
                    if(rcvedMsg instanceof TextMessage){
                        TextMessage txtMsg = session.createTextMessage();
                        txtMsg.setText(((TextMessage) rcvedMsg).getText());
                        returnMsg = txtMsg;
                    }
                    else if(rcvedMsg instanceof BytesMessage){
                        BytesMessage bytesMsg = session.createBytesMessage();
                        if(!(((BytesMessage) rcvedMsg).getBodyLength() > Integer.MAX_VALUE)){
                            byte[] bodyContent = new byte[(int) ((BytesMessage) rcvedMsg).getBodyLength()];
                            bytesMsg.writeBytes(bodyContent);
                            returnMsg = bytesMsg;
                        }
                    }
                    return returnMsg;
                }
            };

            jmsTemplate.send(msgCreator);
    }
    catch(JmsException | JMSException e){
        logger.error("Error when transfering message: '{}'. {}",message,e);
    }
}

应用程序在没有任何特定错误或警告的情况下启动,但是一旦我将消息放入源队列中,我可以通过日志看到 onMessage 方法正在一遍又一遍地运行同一条消息,就好像事务一直在回滚并再次重新启动(不会在任何地方抛出错误)。

我还注意到,如果我碰巧使用相同的源 url 和目标 url(意味着相同的代理,但每个代理都有自己的 connectionFactory),它会工作并且消息会按预期在源队列和目标队列之间传输。

我想知道的是

  1. 我在设置中做错了什么?为什么我的事务“似乎”在使用 2 个不同的代理时一遍又一遍地回滚,但在使用相同的代理(但超过 2 个不同的连接工厂)时工作?
  2. 我不完全相信 onMessage 当前正在执行正确的事务,因为我目前正在捕获所有异常并且什么都不做,我相信这将在 jmstemplate 完成发送消息之前提交 dmlc 的事务,但我不确定。如果是这种情况,SessionAwareMessageListener 会更好吗?我应该在 onMessage 方法中设置@Transacted 吗?

任何人都可以帮助阐明这个问题吗?欢迎所有输入。

更新:

我意识到“回滚”的问题是由于我使用的两个 AMQ 都是通过代理网络相互连接的,而且我碰巧对源和目标使用了相同的队列名称。这导致应用程序将消息从一个 AMQ 传输到另一个 AMQ,然后立即,因为源 AMQ 上有一个消费者,消息将被传输回原始 AMQ,而原始 AMQ 又被视为我的应用程序发出新消息并再次传输,循环无限进行。下面发布的解决方案有助于解决其他问题。

4

1 回答 1

0
try {
   ... Code
} catch (JmsException je) {
    logger.error("Error when transfering message: '{}'. {}",message,e);
}

上面的代码正在吞噬异常,您不应该捕获异常或重新抛出异常,以便事务管理可以适当地处理它。目前没有看到异常,执行了可能导致奇怪结果的提交。

我会做以下类似的事情,JmsException来自 Spring,并且作为 Spring 中的大多数例外,一个RuntimeException. 只需重新启动,也可以记录异常堆栈跟踪,正确删除{}日志语句中的第二个。

try {
   ... Code
} catch (JmsException je) {
    logger.error("Error when transfering message: '{}'.",message,e);
    throw je;
}

但是,这将重复日志记录,因为 Spring 也会记录错误。

对于JMSException做这样的事情,将其转换为JmsException.

try {
   ... Code
} catch (JMSException je) {
    logger.error("Error when transfering message: '{}'.",message,e);
    throw JmsUtils.convertJmsAccessException(je);
}

要获得有关发生情况的更多信息,您可能需要为org.springframework.jms包启用调试日志记录。这将使您深入了解发送/接收消息时发生的情况。

您使用事务会话和手动确认消息的另一件事,但是您不在message.acknowledge()代码中执行。由于 JTA 事务,Spring 不会调用它。尝试将其切换到SESSION_TRANSACTED。至少对于DMLC.

于 2014-02-27T15:38:03.753 回答