我最近从在我的 Spring Boot 应用程序中使用标准 Rabbit 模板更改为使用 Async Rabbit 模板。在这个过程中,我从标准send
方法切换到使用sendAndReceive
方法。
进行此更改似乎不会影响将消息发布到 RabbitMQ,但是我现在在发送消息时确实看到如下堆栈跟踪:
org.springframework.amqp.core.AmqpReplyTimeoutException: Reply timed out
at org.springframework.amqp.rabbit.AsyncRabbitTemplate$RabbitFuture$TimeoutTask.run(AsyncRabbitTemplate.java:762) [spring-rabbit-2.3.10.jar!/:2.3.10]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.3.9.jar!/:5.3.9]
我已经尝试修改各种设置,包括回复和接收超时,但所有这些更改都是接收上述错误所需的时间。我也尝试过设置useDirectReplyToContainer
为true
以及设置useChannelForCorrelation
为true
.
我已经设法使用在 docker 中运行的 RabbitMQ 代理在下面包含的主要方法中重新创建了该问题。
public static void main(String[] args) {
com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
cf.setUsername("<my-username>");
cf.setPassword("<my-password>");
cf.setVirtualHost("<my-vhost>");
ConnectionFactory connectionFactory = new CachingConnectionFactory(cf);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange("primary");
rabbitTemplate.setUseDirectReplyToContainer(true);
rabbitTemplate.setReceiveTimeout(10000);
rabbitTemplate.setReplyTimeout(10000);
rabbitTemplate.setUseChannelForCorrelation(true);
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
asyncRabbitTemplate.start();
System.out.printf("Async Rabbit Template Running? %b\n", asyncRabbitTemplate.isRunning());
MessageBuilderSupport<MessageProperties> props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId(UUID.randomUUID().toString())
.setHeader(PUBLISH_TIME_HEADER, Instant.now(Clock.systemUTC()).toEpochMilli())
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
asyncRabbitTemplate.sendAndReceive(
"1.1.1.csv-routing-key",
new Message(
"a,test,csv".getBytes(StandardCharsets.UTF_8),
props.build()
)
).addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
System.out.printf("Error sending message:\n%s\n", ex.getLocalizedMessage());
}
@Override
public void onSuccess(Message result) {
System.out.println("Message successfully sent");
}
});
}
我确信我只是缺少一个配置选项,但会提供任何帮助。
谢谢。:)