我在 Spring Integration 5.4.4 中配置了一个从 AMQP 队列读取并写入 http 出站适配器的路由。例如,当我以编程方式为 http 出站适配器声明错误的 http 主机名(原因 java.net.UnknownHostException)时,我无法控制重试。
即使我在 amqpInboundAdapter 中配置了 RetryTemplate 逻辑,这似乎会生成无限重试(RabbitMQ 容器上未确认消息)。
我的目标应该是:在http出站适配器出错的情况下将消息重新排队N次,否则丢弃消息并且不要再次重新排队。
代码在这里:
Spring集成路线
public IntegrationFlow route(AmqpInboundChannelAdapterSMLCSpec amqpInboundChannelAdapterSMLCSpec) {
return IntegrationFlows
.from(amqpInboundChannelAdapterSMLCSpec)
.filter(validJsonFilter())
.enrichHeaders(h -> h.header("X-Insert-Key",utboundHttpConfig.outboundHttpToken))
.enrichHeaders(h -> h.header("Content-Encoding", "gzip"))
.enrichHeaders(h -> h.header("Content-Type", "application/json"))
.handle(Http.outboundChannelAdapter(outboundHttpConfig.outboundHttpUrl) .mappedRequestHeaders("X-Insert-Key")
.httpMethod(HttpMethod.POST)
)
.get();
}
AmqpInboundChannelAdapterSMLCSpec
public AmqpInboundChannelAdapterSMLCSpec gatewayEventInboundAmqpAdapter(ConnectionFactory connectionFactory) {
RetryTemplate retryTemplate = new RetryTemplate();
exceptionClassifierRetryPolicy.setPolicyMap(exceptionPolicy);
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(1));
retryTemplate.setThrowLastExceptionOnExhausted(true);
return Amqp
.inboundAdapter(connectionFactory, rabbitConfig.inboundQueue())
.configureContainer(c -> c
.concurrentConsumers(3)
.maxConcurrentConsumers(5)
.receiveTimeout(2000)
.alwaysRequeueWithTxManagerRollback(false)
)
.retryTemplate(retryTemplate);
}
有任何想法吗?
非常感谢