我一直在尝试让发布者确认打开的 Spring RabbitMQ 发布者。第一次调用正常并返回确认回调。从第二次通话开始,它失败并显示以下错误消息
javax.servlet.ServletException: org.springframework.web.util.NestedServletException: Request processing failed; nested exception is org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Listener not registered: org.springframework.amqp.rabbit.core.RabbitTemplate@42802df2 []
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:169)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.eclipse.jetty.server.Server.handle(Server.java:564)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:317)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
at org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:128)
at org.eclipse.jetty.util.thread.Invocable$InvocableExecutor.invoke(Invocable.java:222)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:294)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:199)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.web.util.NestedServletException: Request processing failed; nested exception is org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Listener not registered: org.springframework.amqp.rabbit.core.RabbitTemplate@42802df2 []
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:982)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:872)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:841)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:535)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1253)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1155)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)
... 15 common frames omitted
Caused by: org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Listener not registered: org.springframework.amqp.rabbit.core.RabbitTemplate@42802df2 []
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:83)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithDirect(RabbitTemplate.java:1650)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:1531)
at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:1320)
at com.example.messagequeueintegration.impl.RabbitMQFacade.sendMessage(RabbitMQFacade.java:83)
at com.example.messagequeueintegration.impl.RabbitMQFacade.sendMessage(RabbitMQFacade.java:68)
at com.example.messagequeueintegration.impl.RabbitMQFacade.sendPaymentMessage(RabbitMQFacade.java:51)
at com.example.messagequeueintegration.server.rest.controllers.SendMessageController.sendPaymentMessage(SendMessageController.java:35)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:116)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
... 29 common frames omitted
Caused by: java.lang.IllegalArgumentException: Listener not registered: org.springframework.amqp.rabbit.core.RabbitTemplate@42802df2 []
at org.springframework.util.Assert.notNull(Assert.java:134)
at org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl.addPendingConfirm(PublisherCallbackChannelImpl.java:937)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:981)
at com.sun.proxy.$Proxy47.addPendingConfirm(Unknown Source)
at org.springframework.amqp.rabbit.core.RabbitTemplate.setupConfirm(RabbitTemplate.java:2007)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:1979)
at org.springframework.amqp.rabbit.core.RabbitTemplate.exchangeMessages(RabbitTemplate.java:1732)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveAsListener(RabbitTemplate.java:1683)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithDirect(RabbitTemplate.java:1645)
... 48 common frames omitted
这就是我的配置的样子
@Configuration
public class RabbitMQConfig {
@Autowired
private Environment env;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
env.getRequiredProperty(QUEUE_HOST),
env.getRequiredProperty(QUEUE_PORT, Integer.class));
cachingConnectionFactory.setPublisherConfirms(true);
cachingConnectionFactory.setPublisherReturns(true);
return cachingConnectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public MessageUtil messageUtil(
@Qualifier("messagequeueintegration") ObjectMapper objectMapper) {
return new MessageUtil(objectMapper);
}
@Bean
public BrokerFacade brokerFacade(
@Qualifier("messagequeueintegration") ObjectMapper objectMapper,
Environment environment,
MessageUtil messageUtil) {
return new RabbitMQFacade(objectMapper, environment, messageUtil);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE_KEY);
}
@Bean
Binding queue2Binding(@Qualifier("queue2") Queue queue2, TopicExchange topicExchange) {
return BindingBuilder.bind(queue2).to(topicExchange)
.with(env.getRequiredProperty("queue2name"));
}
@Bean
Binding queue1Binding(@Qualifier("queue1") Queue queue1, TopicExchange topicExchange) {
return BindingBuilder.bind(queueq).to(topicExchange)
.with(env.getRequiredProperty("queue1name"));
}
@Bean
public Queue queue1() {
return new Queue(
env.getRequiredProperty("queue1name"),
true,
false,
true;
}
@Bean
public Queue queue2() {
return new Queue(
env.getRequiredProperty("queue2name"),
true,
false,
true);
}
我自动装配rabbittemplate并使用发布者确认回调调用它,如下所示
Message amqpMessage = new Message(message.getBytes(), new MessageProperties());
rabbitTemplate.setConfirmCallback((cdata, ack, cause) -> {
System.out.println("Confirm callback"+ ack +" for "+cdata.toString());
});
rabbitTemplate.setReturnCallback((msg,replyCode,replyText,
exchange, routingKey)->{
System.out.println("call back returned for "+ routingKey);
});
rabbitTemplate.setMandatory(true);
rabbitTemplate
.sendAndReceive(TOPIC_EXCHANGE_KEY,environment.getProperty(correlationData.getMessageType().getQueueName()),
amqpMessage, createCorrelationData(correlationData));
经过大量调试后,我发现它失败了,因为模板没有注册为新实例中的侦听器,这些新实例PublisherCallbackChannelImpl
是由第一个之后的后续调用创建的。我有点不知道这是配置问题还是发布者确认逻辑的问题。对此的任何见解都将非常有帮助。谢谢
编辑我正在使用 spring-amqp 版本 2.0.3-RELEASE 和 spring-messaging 版本 5.0.6-RELEASE
Edit2:第一次和第二次调用的调试日志 https://gist.github.com/jissjanardhanan/cbad51ba77fad3eda484d7c33c0b1517