1

我在 Spring Integration 5.4.4 中配置了一个从 AMQP 队列读取并写入 http 出站适配器的路由。例如,当我以编程方式为 http 出站适配器声明错误的 http 主机名(原因 java.net.UnknownHostException)时,我无法控制重试。

即使我在 amqpInboundAdapter 中配置了 RetryTemplate 逻辑,这似乎会生成无限重试(RabbitMQ 容器上未确认消息)。

我的目标应该是:在http出站适配器出错的情况下将消息重新排队N次,否则丢弃消息并且不要再次重新排队

代码在这里:

Spring集成路线

public IntegrationFlow route(AmqpInboundChannelAdapterSMLCSpec amqpInboundChannelAdapterSMLCSpec) {
        return IntegrationFlows
                .from(amqpInboundChannelAdapterSMLCSpec)
                .filter(validJsonFilter())
                .enrichHeaders(h -> h.header("X-Insert-Key",utboundHttpConfig.outboundHttpToken))
                .enrichHeaders(h -> h.header("Content-Encoding", "gzip"))
                .enrichHeaders(h -> h.header("Content-Type", "application/json"))
                .handle(Http.outboundChannelAdapter(outboundHttpConfig.outboundHttpUrl)                         .mappedRequestHeaders("X-Insert-Key")
                        .httpMethod(HttpMethod.POST)
                )
                .get();
    }

AmqpInboundChannelAdapterSMLCSpec

public AmqpInboundChannelAdapterSMLCSpec gatewayEventInboundAmqpAdapter(ConnectionFactory connectionFactory) {

        RetryTemplate retryTemplate = new RetryTemplate();

        exceptionClassifierRetryPolicy.setPolicyMap(exceptionPolicy);
        retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(1));
        retryTemplate.setThrowLastExceptionOnExhausted(true);

        return Amqp
                .inboundAdapter(connectionFactory, rabbitConfig.inboundQueue())
                .configureContainer(c -> c
                        .concurrentConsumers(3)
                        .maxConcurrentConsumers(5)
                        .receiveTimeout(2000)
                        .alwaysRequeueWithTxManagerRollback(false)
                )
                .retryTemplate(retryTemplate);
    }

有任何想法吗?

非常感谢

4

1 回答 1

1

如果http出站适配器出错,则将消息重新排队N次,否则丢弃该消息并且不再重新排队。

当您在 AMQP MessageListenerContainer 上使用重试时,会出现重新排队:重试在内存中完成,无需往返代理。

无论如何,您到目前为止所做的一切都还可以。只有您缺少的是RejectAndDontRequeueRecoverer配置它Amqp.inboundAdapter()来决定在所有重试尝试都用尽时如何处理 AMQP 消息。

不幸的是MessageRecoverer,通道适配器的直接配置已添加自版本5.5https ://docs.spring.io/spring-integration/docs/5.5.0-M3/reference/html/whats-new.html#x5.5-amqp .

对于当前版本,它必须通过recoveryCallback(RecoveryCallback<?> recoveryCallback)选项和相应的委托来完成:

   .recoveryCallback(context -> {
        org.springframework.amqp.core.Message messageToReject =
            (org.springframework.amqp.core.Message) RetrySynchronizationManager.getContext()
                    .getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
        throw new ListenerExecutionFailedException("Retry Policy Exhausted",
                new AmqpRejectAndDontRequeueException(context.getLastThrowable()), messageToReject);
    }))
于 2021-03-26T13:27:16.543 回答