0

我有一个运行 Apache Camel 和 ActiveMQ 的 Spring Boot 应用程序。我正在从 ProducerTemplate 发送 JMS 消息,而消费者在不同的应用程序中,但我对这两个应用程序使用相同的 ActiveMQ 代理。当我尝试发送消息时,我收到以下异常。

org.springframework.jms.listener.adapter.ReplyFailureException: Failed to send reply with payload

当我看得更远时,我得到以下异常:

nested exception is javax.jms.InvalidDestinationException: Cannot determine response destination: Request message does not contain reply-to destination, and no default response destination set.
    at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:285) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:225) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) [spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) [spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237) [spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) [spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) [spring-jms-5.3.7.jar:5.3.7]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_222]
Caused by: javax.jms.InvalidDestinationException: Cannot determine response destination: Request message does not contain reply-to destination, and no default response destination set.
    at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.getResponseDestination(AbstractAdaptableMessageListener.java:393) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.getResponseDestination(AbstractAdaptableMessageListener.java:366) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:281) ~[spring-jms-5.3.7.jar:5.3.7]
    ... 10 common frames omitted

以下是我正在使用的配置:

@EnableJms
public class JmsAMQConfig {

    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${jms.concurrent.connections}")
    private String numOfInstances;

    private final JtaTransactionManager transactionManager;
    
    private final ValidationQueueConsumer validationQueueConsumer;
   
    private final ProcessQueueConsumer processQueueConsumer;

    @Autowired
    public JmsAMQConfig(JtaTransactionManager transactionManager, ValidationQueueConsumer validationQueueConsumer, ProcessQueueConsumer processQueueConsumer) {

        this.transactionManager = transactionManager;
        this.validationQueueConsumer = validationQueueConsumer;
        this.processQueueConsumer= processQueueConsumer;
    }

    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL(brokerUrl);
        activeMQConnectionFactory.setTrustAllPackages(true);
        activeMQConnectionFactory.setPrefetchPolicy(customPrefetchPolicy());
        activeMQConnectionFactory.setRedeliveryPolicy(customRedeliveryPolicy());
        activeMQConnectionFactory.setUseAsyncSend(true);

        return activeMQConnectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
        jmsTemplate.setReceiveTimeout(2000);
        jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        jmsTemplate.setMessageConverter(customJmsMessageConverter());
        return jmsTemplate;
    }

   
    @Bean(name = "containerFactory")
    @DependsOn("transactionManager")
    public DefaultJmsListenerContainerFactory containerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        factory.setConcurrency(this.numOfInstances);
        factory.setSessionTransacted(true);
        factory.setTransactionManager(transactionManager);
        factory.setConnectionFactory(activeMQConnectionFactory());
        factory.setMessageConverter(customJmsMessageConverter());
        //factory.setErrorHandler();
        factory.setAutoStartup(true);
        factory.setPubSubDomain(true);
        factory.setErrorHandler(t -> {
            System.out.println("Exception" + t);
        });

        return factory;
    }

    private RedeliveryPolicy customRedeliveryPolicy() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(2);
        redeliveryPolicy.setBackOffMultiplier(2.0);
        redeliveryPolicy.setUseExponentialBackOff(true);
        return redeliveryPolicy;
    }

        private ActiveMQPrefetchPolicy customPrefetchPolicy() {
            ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
            activeMQPrefetchPolicy.setQueuePrefetch(1);
            return activeMQPrefetchPolicy;
        }


    @Bean
    public MessageConverter customJmsMessageConverter() {
        return new CustomMessageConverter();
    }

    @Bean(name = "pooledJmsConfiguration")
    @Primary
    public JmsConfiguration getJmsConfiguration() {
        JmsConfiguration jmsConfiguration = new JmsConfiguration();
        jmsConfiguration.setConnectionFactory(getJMSConnectionFactory());
        jmsConfiguration.setTransactionManager(transactionManager);
        jmsConfiguration.setTransacted(true);
        jmsConfiguration.setAcknowledgementModeName("TRANSACTED");
        jmsConfiguration.setConcurrentConsumers(30);
        jmsConfiguration.setMaxConcurrentConsumers(50);
        jmsConfiguration.setCacheLevelName("CACHE_NONE");

        return jmsConfiguration;
    } 

    @Bean(name = "jmsTransactional")
    public JmsComponent getJmsComponent() {
        JmsComponent jmsComponent = new JmsComponent();
        jmsComponent.setConfiguration(getJmsConfiguration());

        return jmsComponent;
    }

    @Bean(name = "jmsConnectionFactory", initMethod = "start", destroyMethod = "stop")
    public PooledConnectionFactory getJMSConnectionFactory() {
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory());
        pooledConnectionFactory.setIdleTimeout(0);
        pooledConnectionFactory.setMaxConnections(8);
        pooledConnectionFactory.setMaximumActiveSessionPerConnection(500);

        return pooledConnectionFactory;
    }

    @Bean(name = "cachingFactory")
    public CachingConnectionFactory cachingFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setTargetConnectionFactory(getJMSConnectionFactory());
        cachingConnectionFactory.setReconnectOnException(true);
        cachingConnectionFactory.setSessionCacheSize(15);

        return cachingConnectionFactory;
    }

    

    @Bean(name = "validationQueueConsumerAdapter")
    public MessageListenerAdapter validationQueueConsumerAdapter() {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        messageListenerAdapter.setDelegate(validationQueueConsumer);
        messageListenerAdapter.setDefaultListenerMethod("processMessage");

        return messageListenerAdapter;
    }

    @Bean(name = "processQueueConsumerAdapter")
    public MessageListenerAdapter processQueueConsumerAdapter() {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        messageListenerAdapter.setDelegate(processQueueConsumer);
        messageListenerAdapter.setDefaultListenerMethod("processMessage");

        return messageListenerAdapter;
    }

    @Bean(name = "validationQueueConsumerListenerContainer")
    @DependsOn("transactionManager")
    public DefaultMessageListenerContainer validationQueueConsumerListenerContainer() {

        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setConnectionFactory(activeMQConnectionFactory());
        defaultMessageListenerContainer.setDestinationName("validationQueue");
        defaultMessageListenerContainer.setConcurrentConsumers(10);
        defaultMessageListenerContainer.setMaxConcurrentConsumers(20);
        defaultMessageListenerContainer.setAutoStartup(true);
        defaultMessageListenerContainer.setSessionAcknowledgeModeName("CLIENT_ACKNOWLEDGE");
        defaultMessageListenerContainer.setMessageListener(validationQueueConsumerAdapter());
        defaultMessageListenerContainer.setAcceptMessagesWhileStopping(true);
        defaultMessageListenerContainer.setSessionTransacted(true);
        defaultMessageListenerContainer.setBackOff(getBackOffPolicy());

        return defaultMessageListenerContainer;

    }

    @Bean(name = "processQueueConsumerListenerContainer")
    @DependsOn("transactionManager")
    public DefaultMessageListenerContainer processQueueConsumerListenerContainer() {

        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setConnectionFactory(activeMQConnectionFactory());
        defaultMessageListenerContainer.setDestinationName("processQueue");
        defaultMessageListenerContainer.setConcurrentConsumers(10);
        defaultMessageListenerContainer.setMaxConcurrentConsumers(20);
        defaultMessageListenerContainer.setAutoStartup(true);
        defaultMessageListenerContainer.setSessionAcknowledgeModeName("CLIENT_ACKNOWLEDGE");
        defaultMessageListenerContainer.setMessageListener(processQueueConsumerAdapter());
        defaultMessageListenerContainer.setAcceptMessagesWhileStopping(true);
        //defaultMessageListenerContainer.setSessionTransacted(true);
        FixedBackOff fbo = new FixedBackOff();
        fbo.setMaxAttempts(1);
        fbo.setInterval(5000);
        defaultMessageListenerContainer.setBackOff(fbo);
        defaultMessageListenerContainer.setErrorHandler(t -> {
            System.out.println("Exception" + t);
        });

        return defaultMessageListenerContainer;

    }


    @Bean(name = "cxf.default.workqueue")
    public AutomaticWorkQueueImpl automaticWorkQueue() {
        AutomaticWorkQueueImpl automaticWorkQueue = new AutomaticWorkQueueImpl();
        automaticWorkQueue.setName("default");
        automaticWorkQueue.setQueueSize(512);

        return automaticWorkQueue;
    }

    private BackOff getBackOffPolicy() {
        FixedBackOff fbo = new FixedBackOff();
        fbo.setMaxAttempts(3);
        fbo.setInterval(5000);
        return fbo;
    }

}

验证队列消费者:

@Component
public class ValidationQueueConsumer {

@Autowired
CamelContext context;

public void processMessage(String value) {
   ProducerTemplate template = context.createProducerTemplate();
                template.asyncSendBody("jmsTransactional:queue:processQueue?requestTimeout=90000&explicitQosEnabled=true&deliveryPersistent=true&exchangePattern=InOnly", value);
}

}

和 ProcessQueueConsumer :

@Component
    public class ProcessQueueConsumer {
    
    @Autowired
    CamelContext context;
    
    public void processMessage(String value) {
       ProducerTemplate template = context.createProducerTemplate();

       Map<String, Object> headers = new HashMap<>();

    headers.put("numbers", "1");

                    context.createProducerTemplate().sendBodyAndHeaders(
            "jmsTransactional:queue:remoteQueue?exchangePattern=InOnly&requestTimeout=90000&disableReplyTo=true", ExchangePattern.InOnly, object, headers);
    }
    
    }

不同应用程序中的使用者按预期工作。它正在接收消息并处理它。但是,我收到以下异常:

nested exception is javax.jms.InvalidDestinationException: Cannot determine response destination: Request message does not contain reply-to destination, and no default response destination set.
    at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:285) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:225) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) [spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) [spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237) [spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) [spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) [spring-jms-5.3.7.jar:5.3.7]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_222]
Caused by: javax.jms.InvalidDestinationException: Cannot determine response destination: Request message does not contain reply-to destination, and no default response destination set.
    at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.getResponseDestination(AbstractAdaptableMessageListener.java:393) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.getResponseDestination(AbstractAdaptableMessageListener.java:366) ~[spring-jms-5.3.7.jar:5.3.7]
    at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:281) ~[spring-jms-5.3.7.jar:5.3.7]
    ... 10 common frames omitted

它几乎看起来像 processQueue(生产者)的 DefaultMessageListenerContainer 正在尝试进行回复,但没有找到 defaultresponseQueue。

当我添加以下行messageListenerAdapter.setDefaultResponseQueueName("processMessage"); ,那么没有例外,但是我们从processQueue产生的相同消息也被发送到“ processMessage ”队列

@Bean(name = "processQueueConsumerAdapter")
        public MessageListenerAdapter processQueueConsumerAdapter() {
            MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
            messageListenerAdapter.setDelegate(processQueueConsumer);
            messageListenerAdapter.setDefaultListenerMethod("processMessage");
            messageListenerAdapter.setDefaultResponseQueueName("processMessage");
    
            return messageListenerAdapter;
        }
4

0 回答 0