我有一个运行 Apache Camel 和 ActiveMQ 的 Spring Boot 应用程序。我正在从 ProducerTemplate 发送 JMS 消息,而消费者在不同的应用程序中,但我对这两个应用程序使用相同的 ActiveMQ 代理。当我尝试发送消息时,我收到以下异常。
org.springframework.jms.listener.adapter.ReplyFailureException: Failed to send reply with payload
当我看得更远时,我得到以下异常:
nested exception is javax.jms.InvalidDestinationException: Cannot determine response destination: Request message does not contain reply-to destination, and no default response destination set.
at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:285) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:225) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) [spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) [spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237) [spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) [spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) [spring-jms-5.3.7.jar:5.3.7]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_222]
Caused by: javax.jms.InvalidDestinationException: Cannot determine response destination: Request message does not contain reply-to destination, and no default response destination set.
at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.getResponseDestination(AbstractAdaptableMessageListener.java:393) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.getResponseDestination(AbstractAdaptableMessageListener.java:366) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:281) ~[spring-jms-5.3.7.jar:5.3.7]
... 10 common frames omitted
以下是我正在使用的配置:
@EnableJms
public class JmsAMQConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${jms.concurrent.connections}")
private String numOfInstances;
private final JtaTransactionManager transactionManager;
private final ValidationQueueConsumer validationQueueConsumer;
private final ProcessQueueConsumer processQueueConsumer;
@Autowired
public JmsAMQConfig(JtaTransactionManager transactionManager, ValidationQueueConsumer validationQueueConsumer, ProcessQueueConsumer processQueueConsumer) {
this.transactionManager = transactionManager;
this.validationQueueConsumer = validationQueueConsumer;
this.processQueueConsumer= processQueueConsumer;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerUrl);
activeMQConnectionFactory.setTrustAllPackages(true);
activeMQConnectionFactory.setPrefetchPolicy(customPrefetchPolicy());
activeMQConnectionFactory.setRedeliveryPolicy(customRedeliveryPolicy());
activeMQConnectionFactory.setUseAsyncSend(true);
return activeMQConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
jmsTemplate.setReceiveTimeout(2000);
jmsTemplate.setSessionTransacted(true);
jmsTemplate.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
jmsTemplate.setMessageConverter(customJmsMessageConverter());
return jmsTemplate;
}
@Bean(name = "containerFactory")
@DependsOn("transactionManager")
public DefaultJmsListenerContainerFactory containerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConcurrency(this.numOfInstances);
factory.setSessionTransacted(true);
factory.setTransactionManager(transactionManager);
factory.setConnectionFactory(activeMQConnectionFactory());
factory.setMessageConverter(customJmsMessageConverter());
//factory.setErrorHandler();
factory.setAutoStartup(true);
factory.setPubSubDomain(true);
factory.setErrorHandler(t -> {
System.out.println("Exception" + t);
});
return factory;
}
private RedeliveryPolicy customRedeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(2);
redeliveryPolicy.setBackOffMultiplier(2.0);
redeliveryPolicy.setUseExponentialBackOff(true);
return redeliveryPolicy;
}
private ActiveMQPrefetchPolicy customPrefetchPolicy() {
ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
activeMQPrefetchPolicy.setQueuePrefetch(1);
return activeMQPrefetchPolicy;
}
@Bean
public MessageConverter customJmsMessageConverter() {
return new CustomMessageConverter();
}
@Bean(name = "pooledJmsConfiguration")
@Primary
public JmsConfiguration getJmsConfiguration() {
JmsConfiguration jmsConfiguration = new JmsConfiguration();
jmsConfiguration.setConnectionFactory(getJMSConnectionFactory());
jmsConfiguration.setTransactionManager(transactionManager);
jmsConfiguration.setTransacted(true);
jmsConfiguration.setAcknowledgementModeName("TRANSACTED");
jmsConfiguration.setConcurrentConsumers(30);
jmsConfiguration.setMaxConcurrentConsumers(50);
jmsConfiguration.setCacheLevelName("CACHE_NONE");
return jmsConfiguration;
}
@Bean(name = "jmsTransactional")
public JmsComponent getJmsComponent() {
JmsComponent jmsComponent = new JmsComponent();
jmsComponent.setConfiguration(getJmsConfiguration());
return jmsComponent;
}
@Bean(name = "jmsConnectionFactory", initMethod = "start", destroyMethod = "stop")
public PooledConnectionFactory getJMSConnectionFactory() {
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory());
pooledConnectionFactory.setIdleTimeout(0);
pooledConnectionFactory.setMaxConnections(8);
pooledConnectionFactory.setMaximumActiveSessionPerConnection(500);
return pooledConnectionFactory;
}
@Bean(name = "cachingFactory")
public CachingConnectionFactory cachingFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(getJMSConnectionFactory());
cachingConnectionFactory.setReconnectOnException(true);
cachingConnectionFactory.setSessionCacheSize(15);
return cachingConnectionFactory;
}
@Bean(name = "validationQueueConsumerAdapter")
public MessageListenerAdapter validationQueueConsumerAdapter() {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
messageListenerAdapter.setDelegate(validationQueueConsumer);
messageListenerAdapter.setDefaultListenerMethod("processMessage");
return messageListenerAdapter;
}
@Bean(name = "processQueueConsumerAdapter")
public MessageListenerAdapter processQueueConsumerAdapter() {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
messageListenerAdapter.setDelegate(processQueueConsumer);
messageListenerAdapter.setDefaultListenerMethod("processMessage");
return messageListenerAdapter;
}
@Bean(name = "validationQueueConsumerListenerContainer")
@DependsOn("transactionManager")
public DefaultMessageListenerContainer validationQueueConsumerListenerContainer() {
DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setConnectionFactory(activeMQConnectionFactory());
defaultMessageListenerContainer.setDestinationName("validationQueue");
defaultMessageListenerContainer.setConcurrentConsumers(10);
defaultMessageListenerContainer.setMaxConcurrentConsumers(20);
defaultMessageListenerContainer.setAutoStartup(true);
defaultMessageListenerContainer.setSessionAcknowledgeModeName("CLIENT_ACKNOWLEDGE");
defaultMessageListenerContainer.setMessageListener(validationQueueConsumerAdapter());
defaultMessageListenerContainer.setAcceptMessagesWhileStopping(true);
defaultMessageListenerContainer.setSessionTransacted(true);
defaultMessageListenerContainer.setBackOff(getBackOffPolicy());
return defaultMessageListenerContainer;
}
@Bean(name = "processQueueConsumerListenerContainer")
@DependsOn("transactionManager")
public DefaultMessageListenerContainer processQueueConsumerListenerContainer() {
DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setConnectionFactory(activeMQConnectionFactory());
defaultMessageListenerContainer.setDestinationName("processQueue");
defaultMessageListenerContainer.setConcurrentConsumers(10);
defaultMessageListenerContainer.setMaxConcurrentConsumers(20);
defaultMessageListenerContainer.setAutoStartup(true);
defaultMessageListenerContainer.setSessionAcknowledgeModeName("CLIENT_ACKNOWLEDGE");
defaultMessageListenerContainer.setMessageListener(processQueueConsumerAdapter());
defaultMessageListenerContainer.setAcceptMessagesWhileStopping(true);
//defaultMessageListenerContainer.setSessionTransacted(true);
FixedBackOff fbo = new FixedBackOff();
fbo.setMaxAttempts(1);
fbo.setInterval(5000);
defaultMessageListenerContainer.setBackOff(fbo);
defaultMessageListenerContainer.setErrorHandler(t -> {
System.out.println("Exception" + t);
});
return defaultMessageListenerContainer;
}
@Bean(name = "cxf.default.workqueue")
public AutomaticWorkQueueImpl automaticWorkQueue() {
AutomaticWorkQueueImpl automaticWorkQueue = new AutomaticWorkQueueImpl();
automaticWorkQueue.setName("default");
automaticWorkQueue.setQueueSize(512);
return automaticWorkQueue;
}
private BackOff getBackOffPolicy() {
FixedBackOff fbo = new FixedBackOff();
fbo.setMaxAttempts(3);
fbo.setInterval(5000);
return fbo;
}
}
验证队列消费者:
@Component
public class ValidationQueueConsumer {
@Autowired
CamelContext context;
public void processMessage(String value) {
ProducerTemplate template = context.createProducerTemplate();
template.asyncSendBody("jmsTransactional:queue:processQueue?requestTimeout=90000&explicitQosEnabled=true&deliveryPersistent=true&exchangePattern=InOnly", value);
}
}
和 ProcessQueueConsumer :
@Component
public class ProcessQueueConsumer {
@Autowired
CamelContext context;
public void processMessage(String value) {
ProducerTemplate template = context.createProducerTemplate();
Map<String, Object> headers = new HashMap<>();
headers.put("numbers", "1");
context.createProducerTemplate().sendBodyAndHeaders(
"jmsTransactional:queue:remoteQueue?exchangePattern=InOnly&requestTimeout=90000&disableReplyTo=true", ExchangePattern.InOnly, object, headers);
}
}
不同应用程序中的使用者按预期工作。它正在接收消息并处理它。但是,我收到以下异常:
nested exception is javax.jms.InvalidDestinationException: Cannot determine response destination: Request message does not contain reply-to destination, and no default response destination set.
at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:285) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:225) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) [spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) [spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237) [spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) [spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) [spring-jms-5.3.7.jar:5.3.7]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_222]
Caused by: javax.jms.InvalidDestinationException: Cannot determine response destination: Request message does not contain reply-to destination, and no default response destination set.
at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.getResponseDestination(AbstractAdaptableMessageListener.java:393) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.getResponseDestination(AbstractAdaptableMessageListener.java:366) ~[spring-jms-5.3.7.jar:5.3.7]
at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:281) ~[spring-jms-5.3.7.jar:5.3.7]
... 10 common frames omitted
它几乎看起来像 processQueue(生产者)的 DefaultMessageListenerContainer 正在尝试进行回复,但没有找到 defaultresponseQueue。
当我添加以下行messageListenerAdapter.setDefaultResponseQueueName("processMessage"); ,那么没有例外,但是我们从processQueue产生的相同消息也被发送到“ processMessage ”队列
@Bean(name = "processQueueConsumerAdapter")
public MessageListenerAdapter processQueueConsumerAdapter() {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
messageListenerAdapter.setDelegate(processQueueConsumer);
messageListenerAdapter.setDefaultListenerMethod("processMessage");
messageListenerAdapter.setDefaultResponseQueueName("processMessage");
return messageListenerAdapter;
}