1

我正在将消息发布到 RabbitMQ 并且我想在 RabbitMQ 关闭时跟踪错误,为此我添加了一个RetryTemplate带有恢复回调的消息,但恢复回调仅提供此方法getLastThrowable(),我不确定如何提供详细信息RabbitMQ 关闭时失败的消息。(根据文档“RecoveryCallback在重试上下文仅包含该 lastThrowable字段时有所限制。对于更复杂的用例,您应该使用外部 RetryTemplate,以便您可以RecoveryCallback通过上下文的属性将附加信息传达给知道如何做到这一点,如果有人可以帮助我举一个很棒的例子。

兔子模板

public RabbitTemplate rabbitMqTemplate(RecoveryCallback publisherRecoveryCallback) {
    RabbitTemplate r = new RabbitTemplate(rabbitConnectionFactory);
    r.setExchange(exchangeName);
    r.setRoutingKey(routingKey);
    r.setConnectionFactory(rabbitConnectionFactory);
    r.setMessageConverter(jsonMessageConverter());

    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    r.setRetryTemplate(retryTemplate);
    r.setRecoveryCallback(publisherRecoveryCallback);
    return r;
    }

恢复回调

@Component
public class PublisherRecoveryCallback implements RecoveryCallback<AssortmentEvent> {
    @Override
    public AssortmentEvent recover(RetryContext context) throws Exception {
        log.error("Error publising event",context.getLastThrowable());
        //how to get message details here??
        return null;
    }
}

AMQP 出站适配器

return IntegrationFlows.from("eventsChannel") .split() .handle(Amqp.outboundAdapter(rabbitMqTemplate) .exchangeName(exchangeName) .confirmCorrelationExpression("payload") .confirmAckChannel(ackChannel) .confirmNackChannel(nackChannel) ) .get();

4

1 回答 1

1

这是不可能的,因为该函数RabbitTemplate.execute()已经不知道您发送的消息,因为它可以从任何其他方法执行,我们可能没有要处理的消息:

return this.retryTemplate.execute(
                    (RetryCallback<T, Exception>) context -> RabbitTemplate.this.doExecute(action, connectionFactory),
                    (RecoveryCallback<T>) this.recoveryCallback);

我建议你做的就像将消息存储到ThreadLocal之前发送并从你的 custom 那里获取它RecoveryCallback

于 2017-07-27T14:06:39.020 回答