1

我最近从在我的 Spring Boot 应用程序中使用标准 Rabbit 模板更改为使用 Async Rabbit 模板。在这个过程中,我从标准send方法切换到使用sendAndReceive方法。

进行此更改似乎不会影响将消息发布到 RabbitMQ,但是我现在在发送消息时确实看到如下堆栈跟踪:

org.springframework.amqp.core.AmqpReplyTimeoutException: Reply timed out
    at org.springframework.amqp.rabbit.AsyncRabbitTemplate$RabbitFuture$TimeoutTask.run(AsyncRabbitTemplate.java:762) [spring-rabbit-2.3.10.jar!/:2.3.10]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.3.9.jar!/:5.3.9]

我已经尝试修改各种设置,包括回复和接收超时,但所有这些更改都是接收上述错误所需的时间。我也尝试过设置useDirectReplyToContainertrue以及设置useChannelForCorrelationtrue.
我已经设法使用在 docker 中运行的 RabbitMQ 代理在下面包含的主要方法中重新创建了该问题。

public static void main(String[] args) {
        com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
        cf.setHost("localhost");
        cf.setPort(5672);
        cf.setUsername("<my-username>");
        cf.setPassword("<my-password>");
        cf.setVirtualHost("<my-vhost>");

        ConnectionFactory connectionFactory = new CachingConnectionFactory(cf);

        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("primary");
        rabbitTemplate.setUseDirectReplyToContainer(true);
        rabbitTemplate.setReceiveTimeout(10000);
        rabbitTemplate.setReplyTimeout(10000);
        rabbitTemplate.setUseChannelForCorrelation(true);

        AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
        asyncRabbitTemplate.start();

        System.out.printf("Async Rabbit Template Running? %b\n", asyncRabbitTemplate.isRunning());

        MessageBuilderSupport<MessageProperties> props = MessagePropertiesBuilder.newInstance()
            .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
            .setMessageId(UUID.randomUUID().toString())
            .setHeader(PUBLISH_TIME_HEADER, Instant.now(Clock.systemUTC()).toEpochMilli())
            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);

        asyncRabbitTemplate.sendAndReceive(
            "1.1.1.csv-routing-key",
            new Message(
                "a,test,csv".getBytes(StandardCharsets.UTF_8),
                props.build()
            )
        ).addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.printf("Error sending message:\n%s\n", ex.getLocalizedMessage());
            }

            @Override
            public void onSuccess(Message result) {
                System.out.println("Message successfully sent");
            }
        });
    }

我确信我只是缺少一个配置选项,但会提供任何帮助。

谢谢。:)

4

2 回答 2

1

asyncRabbitTemplate.sendAndReceive(..)将始终期望消息使用者的响应,因此您收到的超时。

要触发并忘记使用标准RabbitTemplate.send(...)并在 try/catch 块中捕获任何异常:

try {
        rabbitTemplate.send("1.1.1.csv-routing-key",
        new Message(
            "a,test,csv".getBytes(StandardCharsets.UTF_8),
            props.build());
    } catch (AmqpException ex) {
        log.error("failed to send rabbit message, routing key = {}", routingKey, ex);
    }
于 2022-01-14T17:36:28.843 回答
0

将回复超时设置为更大的数字并查看效果。

    rabbitTemplate.setReplyTimeout(60000);

https://docs.spring.io/spring-amqp/reference/html/#reply-timeout

于 2021-10-27T11:53:30.770 回答