8

我正在寻找一种使用 spring amqp 和 Rabbit MQ 使用退避策略实现重试的好方法,但要求是不应阻塞侦听器(因此可以自由处理其他消息)。我在这里看到了一个类似的问题,但它不包括“退出”的解决方案:

RabbitMQ 和 Spring amqp 重试而不阻塞消费者

我的问题是:

  1. 重试时默认的 spring-retry 实现会阻塞线程吗?github中的实现表明确实如此。

  2. 如果上述假设成立,那么实现此目的的唯一方法是实现一个单独的重试队列(DLQ?),并为每条消息设置一个 TTL(假设我们不想在退避间隔内阻塞线程)。

  3. 如果我们采用上述方法(DLQ 或单独的队列),每次重试尝试不需要单独的队列吗?如果我们只使用 1 个队列进行重试,同一个队列将包含 TTL 从最小重试间隔到最大重试间隔的消息,如果队列前面的消息具有最大 TTL,则它后面的消息不会即使它有最小 TTL 也被捡起。这是这里的 Rabbit MQ TTL 文档(请参阅注意事项):

  4. 是否有另一种方法来实现非阻塞退避重试机制?

添加一些配置信息以帮助解决@garyrusel:

队列配置:

    <rabbit:queue name="regular_requests_queue"/>
    <rabbit:queue name="retry_requests_queue">
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="regular_exchange" />
        </rabbit:queue-arguments>
    </rabbit:queue>

    <rabbit:direct-exchange name="regular_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="regular_requests_queue" key="regular-request-key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:direct-exchange name="retry_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="retry_requests_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <bean id="retryRecoverer" class="com.testretry.RetryRecoverer">
         <constructor-arg ref="retryTemplate"/>
         <constructor-arg value="retry_exchange"/>
    </bean>

    <rabbit:template id="templateWithOneRetry" connection-factory="connectionFactory" exchange="regular_exchange" retry-template="retryTemplate"/>
    <rabbit:template id="retryTemplate" connection-factory="connectionFactory" exchange="retry_exchange"/>

    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="retryPolicy">
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                <property name="maxAttempts" value="1"/>
            </bean>
        </property>
    </bean>
4

3 回答 3

0

你有没有看过rabbitmq延迟插件,它在交换而不是队列中延迟消息?根据文档,发送到延迟器交换的消息似乎在交换级别是持久的。

使用自定义重试计数消息头和延迟器交换,我们可以在没有这些中间队列、dlx 和模板组合的丑陋的情况下实现非阻塞行为

https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

于 2016-05-28T10:16:26.720 回答
0

这是我最终实施的最终解决方案。每个“重试间隔”有 1 个队列,每个重试队列有 1 个交换。它们都被传递给创建恢复器列表的自定义 RepublishRecoverer。

一个名为“RetryCount”的自定义标头被添加到消息中,并且根据“RetryCount”的值,消息以不同的“过期时间”发布到正确的交换/队列。每个重试队列都设置有一个 DLX,它设置为“regular_exchange”(即请求进入常规队列)。

<rabbit:template id="genericTemplateWithRetry" connection-factory="connectionFactory" exchange="regular_exchange" retry-template="retryTemplate"/>

<!-- Create as many templates as retryAttempts (1st arg) in customRetryTemplate-->
<rabbit:template id="genericRetryTemplate1" connection-factory="consumerConnFactory" exchange="retry_exchange_1"/>
<rabbit:template id="genericRetryTemplate2" connection-factory="consumerConnFactory" exchange="retry_exchange_2"/>
<rabbit:template id="genericRetryTemplate3" connection-factory="consumerConnFactory" exchange="retry_exchange_3"/>
<rabbit:template id="genericRetryTemplate4" connection-factory="consumerConnFactory" exchange="retry_exchange_4"/>
<rabbit:template id="genericRetryTemplate5" connection-factory="consumerConnFactory" exchange="retry_exchange_5"/>

<rabbit:queue name="regular_requests_queue"/>

<!-- Create as many queues as retryAttempts (1st arg) in customRetryTemplate -->
<rabbit:queue name="retry_requests_queue_1">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="retry_requests_queue_2">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="retry_requests_queue_3">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="retry_requests_queue_4">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="retry_requests_queue_5">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="regular_exchange" />
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:direct-exchange name="regular_exchange">
    <rabbit:bindings>
        <rabbit:binding queue="regular_requests_queue" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<!-- Create as many exchanges as retryAttempts (1st arg) in customRetryTemplate -->
<rabbit:direct-exchange name="retry_exchange_1">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_1" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="retry_exchange_2">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_2" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="retry_exchange_3">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_3" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="retry_exchange_4">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_4" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:direct-exchange name="retry_exchange_5">
    <rabbit:bindings>
        <rabbit:binding queue="retry_requests_queue_5" key="v1-regular-request"/>
    </rabbit:bindings>
</rabbit:direct-exchange>


<!-- retry config begin -->
<!-- Pass in all templates and exchanges created as list/array arguments below -->
<bean id="customRetryRecoverer" class="com.test.listeners.CustomRetryRecoverer">
    <!-- Pass in list of templates -->
     <constructor-arg>
        <list>
            <ref bean="genericRetryTemplate1"/>
            <ref bean="genericRetryTemplate2"/>
            <ref bean="genericRetryTemplate3"/>
            <ref bean="genericRetryTemplate4"/>
            <ref bean="genericRetryTemplate5"/>
        </list>
     </constructor-arg>
     <!-- Pass in array of exchanges -->
     <constructor-arg value="retry_exchange_1,retry_exchange_2,retry_exchange_3,retry_exchange_4,retry_exchange_5"/>
     <constructor-arg ref="customRetryTemplate"/>
</bean>

<bean id="retryInterceptor"
      class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="customRetryRecoverer"/>
    <property name="retryOperations" ref="retryTemplate"/>
    <property name="messageKeyGenerator" ref="msgKeyGenerator"/>
</bean>
    
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <!--  Set to 1 - just for the initial attempt -->
            <property name="maxAttempts" value="1"/>
        </bean>
    </property>
</bean>

 <bean id="customRetryTemplate" class="com.test.retry.CustomRetryTemplate">
    <constructor-arg value="5"/> <!-- max attempts -->
    <constructor-arg value="3000"/> <!-- Initial interval -->
    <constructor-arg value="5"/> <!-- multiplier for backoff -->
</bean>

<!-- retry config end -->

这是 CustomRetryRecoverer 的代码:

public class CustomRetryRecoverer extends
        RepublishMessageRecoverer {

    private static final String RETRY_COUNT_HEADER_NAME = "RetryCount";
    private List<RepublishMessageRecoverer> retryExecutors = new ArrayList<RepublishMessageRecoverer>();
    private TriggersRetryTemplate retryTemplate;
    
    public TriggersRetryRecoverer(AmqpTemplate[] retryTemplates, String[] exchangeNames, TriggersRetryTemplate retryTemplate) {
        super(retryTemplates[0], exchangeNames[0]);
        this.retryTemplate = retryTemplate;

        //Get lower of the two array sizes
        int executorCount = (exchangeNames.length < retryTemplates.length) ? exchangeNames.length : retryTemplates.length;
        for(int i=0; i<executorCount; i++) {
            createRetryExecutor(retryTemplates[i], exchangeNames[i]);
        }
        //If not enough exchanges/templates provided, reuse the last exchange/template for the remaining retry recoverers
        if(retryTemplate.getMaxRetryCount() > executorCount) {
            for(int i=executorCount; i<retryTemplate.getMaxRetryCount(); i++) {
                createRetryExecutor(retryTemplates[executorCount-1], exchangeNames[executorCount-1]);
            }
        }
    }

    @Override
    public void recover(Message message, Throwable cause) {
        
        if(getRetryCount(message) < retryTemplate.getMaxRetryCount()) {
            incrementRetryCount(message);
            
            //Set the expiration of the retry message
            message.getMessageProperties().setExpiration(String.valueOf(retryTemplate.getNextRetryInterval(getRetryCount(message)).longValue()));
            
            RepublishMessageRecoverer retryRecoverer = null;
            if(getRetryCount(message) != null && getRetryCount(message) > 0) {
                retryRecoverer = retryExecutors.get(getRetryCount(message)-1);
            } else {
                retryRecoverer = retryExecutors.get(0);
            }
            retryRecoverer.recover(message, cause);
        } else {
            //Retries exchausted - do nothing
        }
    }

    private void createRetryExecutor(AmqpTemplate template, String exchangeName) {
        RepublishMessageRecoverer retryRecoverer = new RepublishMessageRecoverer(template, exchangeName);
        retryRecoverer.errorRoutingKeyPrefix(""); //Set KeyPrefix to "" so original key is reused during retries
        retryExecutors.add(retryRecoverer);
    }   

    private Integer getRetryCount(Message msg) {
        Integer retryCount;
        if(msg.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER_NAME) == null) {
            retryCount = 1;
        } else {
            retryCount =  (Integer) msg.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER_NAME);
        }
        
        return retryCount;
    }

    private void incrementRetryCount(Message msg) {
        Integer retryCount;
        if(msg.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER_NAME) == null) {
            retryCount = 1;
        } else {
            retryCount =  (Integer) msg.getMessageProperties().getHeaders().get(RETRY_COUNT_HEADER_NAME)+1;
        }
        msg.getMessageProperties().getHeaders().put(RETRY_COUNT_HEADER_NAME, retryCount);
    }

}

'CustomRetryTemplate' 的代码未在此处发布,但它包含 maxRetryCount、initialInterval 和 multiplier 的简单变量。

于 2015-10-20T23:19:22.707 回答
0
  1. 是的
  2. 通过 4 ...

您可以使用重试最大尝试次数 = 1 的子类RepublishMessageRecoverer并实现additionalHeaders添加,例如重试计数标头。

然后,您可以为每次尝试重新发布到不同的队列。

恢复器的结构并没有真正发布到不同的队列(我们应该改变它),因此您可能需要编写自己的恢复器并委托给几个RepublishMessageRecoverer.

考虑将您的解决方案贡献给框架。

于 2015-09-18T17:07:46.623 回答