我正在将消息发布到 RabbitMQ 并且我想在 RabbitMQ 关闭时跟踪错误,为此我添加了一个RetryTemplate
带有恢复回调的消息,但恢复回调仅提供此方法getLastThrowable()
,我不确定如何提供详细信息RabbitMQ 关闭时失败的消息。(根据文档“RecoveryCallback
在重试上下文仅包含该
lastThrowable
字段时有所限制。对于更复杂的用例,您应该使用外部
RetryTemplate
,以便您可以RecoveryCallback
通过上下文的属性将附加信息传达给知道如何做到这一点,如果有人可以帮助我举一个很棒的例子。
兔子模板
public RabbitTemplate rabbitMqTemplate(RecoveryCallback publisherRecoveryCallback) {
RabbitTemplate r = new RabbitTemplate(rabbitConnectionFactory);
r.setExchange(exchangeName);
r.setRoutingKey(routingKey);
r.setConnectionFactory(rabbitConnectionFactory);
r.setMessageConverter(jsonMessageConverter());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
r.setRetryTemplate(retryTemplate);
r.setRecoveryCallback(publisherRecoveryCallback);
return r;
}
恢复回调
@Component
public class PublisherRecoveryCallback implements RecoveryCallback<AssortmentEvent> {
@Override
public AssortmentEvent recover(RetryContext context) throws Exception {
log.error("Error publising event",context.getLastThrowable());
//how to get message details here??
return null;
}
}
AMQP 出站适配器
return IntegrationFlows.from("eventsChannel")
.split()
.handle(Amqp.outboundAdapter(rabbitMqTemplate)
.exchangeName(exchangeName)
.confirmCorrelationExpression("payload")
.confirmAckChannel(ackChannel)
.confirmNackChannel(nackChannel)
)
.get();