弹簧整合 amqp 版本:5.0.11
目标:当发生致命异常时,消息将被丢弃。但不是致命的,消息将重新排队并执行重试策略。
但就我而言,我有一个自定义消息转换器,当我的转换发生一些非致命错误时,它将始终重新排队并且永远不会进入重试策略。
我尝试阅读代码,AmqpInboundChannelAdapter.Listener#onMessage
在重试之前,它转换消息,这意味着当消息转换发生一些错误时,它不会重试,将进入错误处理程序。
public void onMessage(final Message message, final Channel channel) throws Exception {
boolean retryDisabled = AmqpInboundChannelAdapter.this.retryTemplate == null;
try {
if (retryDisabled) {
createAndSend(message, channel);
}
else {
final org.springframework.messaging.Message<Object> toSend = createMessage(message, channel);
AmqpInboundChannelAdapter.this.retryTemplate.execute(context -> {
StaticMessageHeaderAccessor.getDeliveryAttempt(toSend).incrementAndGet();
setAttributesIfNecessary(message, toSend);
sendMessage(toSend);
return null;
},
(RecoveryCallback<Object>) AmqpInboundChannelAdapter.this.recoveryCallback);
}
}
我的代码如下:
@Bean
public IntegrationFlow EndpointMessageAndConvertModelDlxFlow(
@Qualifier("rabbitmqUnFatalExceptionRetryTemplate") RetryTemplate template,
ConnectionFactory factory,
EndpointCodeDelegatingMessageConverter converter) {
final MailRabbitmqProperties.Queue queue = getQueueConfig(ENDPOINT_BUFFER_DLX_NODE,
ENDPOINT_BUFFER_FUNCTION);
template.setRetryPolicy(endpointMessageExceptionClassifierRetryPolicy()); //fatal err not go retry
return IntegrationFlows.from(Amqp.inboundAdapter(factory, queue.getName())
.configureContainer(smlc -> {
smlc.acknowledgeMode(AcknowledgeMode.AUTO); //
smlc.defaultRequeueRejected(true); //requeue
final ConditionalRejectingErrorHandler errorHandler =
new ConditionalRejectingErrorHandler(
new EndPointMessageFatalExceptionStrategy());
smlc.errorHandler(errorHandler);
})
.retryTemplate(template)
.messageConverter(converter))
.channel(mailActionCreateTopicChannel())
.get();
}
我该如何解决这个问题,谢谢。