0

鉴于我有 IntegrationFlow

IntegrationFlows.from(
        Amqp.inboundAdapter(rabbitConnectionFactory, QUEUE)
                .messageConverter(new MarshallingMessageConverter(xmlMarshaller))
                .defaultRequeueRejected(false)
                .concurrentConsumers(2)
                .maxConcurrentConsumers(4)
                .channelTransacted(true)
                .errorHandler(new ConditionalRejectingErrorHandler())
)
        .log(INFO, AMQP_LOGGER_CATEGORY)
        .publishSubscribeChannel(s -> s
                .subscribe(f -> f
                        .handle(deathCheckHandler))
                .subscribe(f -> f.handle(service))
        )
        .get();

deathCheckHandler在哪里

@Component
public class DeathCheckHandler {

    private static final Logger logger = LoggerFactory.getLogger(lookup().lookupClass());

    private static final int RETRY_COUNT = 3;
    private final RabbitTemplate rabbitTemplate;
    private final Jaxb2Marshaller xmlMarshaller;

    public DeathCheckHandler(RabbitTemplate rabbitTemplate, Jaxb2Marshaller xmlMarshaller) {
        this.rabbitTemplate = rabbitTemplate;
        this.xmlMarshaller = xmlMarshaller;
    }

    @ServiceActivator
    public void check(Message<?> message) {
        MessageHeaders headers = message.getHeaders();

        Optional<XDeath> rejected = findAnyRejectedXDeathMessageHeader(headers);
        if (rejected.isPresent()) {
            int rejectedCount = rejected.get().getCount();
            logger.debug("Rejected count is {}", rejectedCount);
            if (rejectedCount > RETRY_COUNT) {
                parkMessage(message);
            }
        }
    }

    private void parkMessage(Message<?> message) {
        Object payload = message.getPayload();
        MessageHeaders headers = message.getHeaders();
        String parkingExchange = (String) headers.get("amqp_receivedExchange");
        String parkingRoutingKey = ((String) headers.get("amqp_consumerQueue")).replace("queue", "plq");
        rabbitTemplate.setMessageConverter(new MarshallingMessageConverter(xmlMarshaller));
        logger.warn("Tried more than {} times. Parking rejected message: {} to exchange {} and routing key {}", RETRY_COUNT, payload, parkingExchange, parkingRoutingKey);
        rabbitTemplate.convertAndSend(parkingExchange, parkingRoutingKey, payload);
        // cause the message to be acknowledged and not routed to DLQ
        throw new ImmediateAcknowledgeAmqpException("Give up retrying message: " + payload);
    }
}

DeathCheckHandler处理在 AMQP 队列上设置的死信。

如何以不正确的格式(即MarshallingMessageConverterthrows时)停放 XML 消息UnmarshallingFailureException

我想以类似的方式停放它DeathCheckHandler#parkMessage

应该是可能的ConditionalRejectingErrorHandler,但我不知道如何。

4

1 回答 1

1

克隆ConditionalRejectingErrorHandler.

将此方法用作模板...

@Override
public void handleError(Throwable t) {
    log(t);
    if (!this.causeChainContainsARADRE(t) && this.exceptionStrategy.isFatal(t)) {
        if (this.discardFatalsWithXDeath && t instanceof ListenerExecutionFailedException) {
            Message failed = ((ListenerExecutionFailedException) t).getFailedMessage();
            if (failed != null) {
                List<Map<String, ?>> xDeath = failed.getMessageProperties().getXDeathHeader();
                if (xDeath != null && xDeath.size() > 0) {
                    this.logger.error("x-death header detected on a message with a fatal exception; "
                            + "perhaps requeued from a DLQ? - discarding: " + failed);
                    throw new ImmediateAcknowledgeAmqpException("Fatal and x-death present");
                }
            }
        }
        throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", this.rejectManual,
                t);
    }
}

默认情况下,带有 x-death 标头的致命异常会通过ImmediateAcknowledgeAmqpException.

子类化和覆盖这个方法并不容易,因为这些字段是私有的,所以最容易复制这个类(并在抛出 IAAE 之前发布到停车场)。

我将对此类进行一些改进,以使其更易于自定义/覆盖。

拉取请求

于 2020-09-17T18:05:37.877 回答