1

我一直在尝试让发布者确认打开的 Spring RabbitMQ 发布者。第一次调用正常并返回确认回调。从第二次通话开始,它失败并显示以下错误消息

    javax.servlet.ServletException: org.springframework.web.util.NestedServletException: Request processing failed; nested exception is org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Listener not registered: org.springframework.amqp.rabbit.core.RabbitTemplate@42802df2 []
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:169)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
    at org.eclipse.jetty.server.Server.handle(Server.java:564)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:317)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
    at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
    at org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:128)
    at org.eclipse.jetty.util.thread.Invocable$InvocableExecutor.invoke(Invocable.java:222)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:294)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:199)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.web.util.NestedServletException: Request processing failed; nested exception is org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Listener not registered: org.springframework.amqp.rabbit.core.RabbitTemplate@42802df2 []
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:982)
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:872)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:841)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:535)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1253)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1155)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)
    ... 15 common frames omitted
Caused by: org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Listener not registered: org.springframework.amqp.rabbit.core.RabbitTemplate@42802df2 []
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:83)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithDirect(RabbitTemplate.java:1650)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:1531)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:1320)
    at com.example.messagequeueintegration.impl.RabbitMQFacade.sendMessage(RabbitMQFacade.java:83)
    at com.example.messagequeueintegration.impl.RabbitMQFacade.sendMessage(RabbitMQFacade.java:68)
    at com.example.messagequeueintegration.impl.RabbitMQFacade.sendPaymentMessage(RabbitMQFacade.java:51)
    at com.example.messagequeueintegration.server.rest.controllers.SendMessageController.sendPaymentMessage(SendMessageController.java:35)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:116)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
    ... 29 common frames omitted
Caused by: java.lang.IllegalArgumentException: Listener not registered: org.springframework.amqp.rabbit.core.RabbitTemplate@42802df2 []
    at org.springframework.util.Assert.notNull(Assert.java:134)
    at org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl.addPendingConfirm(PublisherCallbackChannelImpl.java:937)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:981)
    at com.sun.proxy.$Proxy47.addPendingConfirm(Unknown Source)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.setupConfirm(RabbitTemplate.java:2007)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:1979)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.exchangeMessages(RabbitTemplate.java:1732)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveAsListener(RabbitTemplate.java:1683)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithDirect(RabbitTemplate.java:1645)
    ... 48 common frames omitted

这就是我的配置的样子

@Configuration
public class RabbitMQConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
                env.getRequiredProperty(QUEUE_HOST),
                env.getRequiredProperty(QUEUE_PORT, Integer.class));
        cachingConnectionFactory.setPublisherConfirms(true);
        cachingConnectionFactory.setPublisherReturns(true);
        return cachingConnectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }



    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }


    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public MessageUtil messageUtil(
            @Qualifier("messagequeueintegration") ObjectMapper objectMapper) {
        return new MessageUtil(objectMapper);
    }

    @Bean
    public BrokerFacade brokerFacade(
            @Qualifier("messagequeueintegration") ObjectMapper objectMapper,
            Environment environment,
            MessageUtil messageUtil) {
        return new RabbitMQFacade(objectMapper, environment, messageUtil);
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE_KEY);
    }

    @Bean
    Binding queue2Binding(@Qualifier("queue2") Queue queue2, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue2).to(topicExchange)
                .with(env.getRequiredProperty("queue2name"));
    }

    @Bean
    Binding queue1Binding(@Qualifier("queue1") Queue queue1, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueq).to(topicExchange)
                .with(env.getRequiredProperty("queue1name"));
    }


    @Bean
    public Queue queue1() {

        return new Queue(
                env.getRequiredProperty("queue1name"),
                true,
                false,
                true;
    }

    @Bean
    public Queue queue2() {
        return new Queue(
                env.getRequiredProperty("queue2name"),
               true,
                false,
               true);
    }

我自动装配rabbittemplate并使用发布者确认回调调用它,如下所示

Message amqpMessage = new Message(message.getBytes(), new MessageProperties());
        rabbitTemplate.setConfirmCallback((cdata, ack, cause) -> {
            System.out.println("Confirm callback"+ ack +" for "+cdata.toString());
        });
        rabbitTemplate.setReturnCallback((msg,replyCode,replyText,
                exchange,  routingKey)->{
            System.out.println("call back returned for "+ routingKey);
        });
        rabbitTemplate.setMandatory(true);
        rabbitTemplate
                .sendAndReceive(TOPIC_EXCHANGE_KEY,environment.getProperty(correlationData.getMessageType().getQueueName()),
                        amqpMessage, createCorrelationData(correlationData));

经过大量调试后,我发现它失败了,因为模板没有注册为新实例中的侦听器,这些新实例PublisherCallbackChannelImpl是由第一个之后的后续调用创建的。我有点不知道这是配置问题还是发布者确认逻辑的问题。对此的任何见解都将非常有帮助。谢谢

编辑我正在使用 spring-amqp 版本 2.0.3-RELEASE 和 spring-messaging 版本 5.0.6-RELEASE

Edit2:第一次和第二次调用的调试日志 https://gist.github.com/jissjanardhanan/cbad51ba77fad3eda484d7c33c0b1517

4

1 回答 1

0
rabbitTemplate.setUseDirectReplyToContainer(false);

@see https://github.com/spring-projects/spring-amqp/issues/846#issuecomment-439072434

于 2020-10-15T03:56:26.613 回答