6

我有以下 PHP 应用程序。这会将用户注册发布到消息队列。Java 应用程序从该队列中读取并导入它。希望下面的图表能够描述它。我只处理 Java 方面的事情。json 消息已经存在于队列中。

在此处输入图像描述

路线(Java 消费方)。

@Component
public class SignUpRouting {

  errorHandler(deadLetterChannel("rabbitmq://signUpDeadLetter.exchange?username=etc..").useOriginalMessage());

  from("rabbitmq://phpSignUp.exchange?username=etc....")
            .routeId("signUpRoute")
            .processRef("signUpProcessor")
            .end();
  //.... 

处理器..

@Component
public class SignupProcessor implements Processor {

    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void process(Exchange exchange) throws Exception {

        String json = exchange.getIn().getBody(String.class);
        SignUpDto dto = mapper.readValue(json, SignUpDto.class);

        SignUp signUp = new SignUp();
        signUp.setWhatever(dto.getWhatever());
        //etc....

        // save record
        signUpDao.save(signUp);
    }
}

我的问题是这个..当处理器无法导入消息时我该怎么办。

例如,假设有一个 DAO 异常。数据字段可能过长或导入格式不正确。我不想丢失消息。我想查看错误并重试导入。但我不想每 30 秒重试一次消息。

我在想我需要创建另一个队列..一个死信队列并无限期地每 6 小时重试一次消息?.. 然后我会查看日志以查看错误并上传修复程序,然后重新处理消息?

我将如何实施?还是我走错了路?

编辑 我尝试设置 deadLetterExchange 以查看是否可以使事情朝着正确的方向发展...但是它出错并说队列不能为非空

 rabbitmq://phpSignUp.exchange?username=etc...&deadLetterExchange=signUpDeadLetter.exchange
4

2 回答 2

2

这是使用死信标题的示例:

        <from uri="rabbitmq://localhost/youexchange?queue=yourq1&amp;
            exchangeType=topic&amp;
            routingKey=user.reg.*&amp;
            deadLetterExchange=dead.msgs&amp;
            deadLetterExchangeType=topic&amp;
            deadLetterRoutingKey=dead.letters&amp;
            deadLetterQueue=dead.letters&amp;
            autoAck=false&amp;
            autoDelete=false"/>

          <!--We can use onException to make camel to retry, and after that, dead letter queue are the fallback-->
        <onException useOriginalMessage="true">
            <exception>java.lang.Exception</exception>
            <redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="3" redeliveryDelay="5000"/>
        </onException>

我们需要关闭autoAck并设置deadLetterQueue,然后如果抛出异常,消息将在死信队列中。要使用onException,我们可以控制骆驼将消息放入死信队列之前的重试。

于 2015-10-23T13:29:55.523 回答
1

您可以使用 onException 来捕获异常,如果有异常,消息将被路由到死信交换,这是 Spring DSL 中的示例:

<onException useOriginalMessage="true">
            <exception>java.sql.SQLException</exception>
            <redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="1" redeliveryDelay="1000"/>

            <inOnly uri="rabbitmq://localhost/dead.msgs?exchangeType=fanout&amp;
                    autoDelete=false&amp;
                    bridgeEndpoint=true"/>
</onException>
于 2015-10-22T04:13:03.627 回答