4

我目前正在使用带有 spring (spring-rabbit-1.2.0-RELEASE) 的 RabbitMQ,配置如下:

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<!-- Asynchronous exchanges -->
<!-- Admin -->
<rabbit:admin connection-factory="connectionFactory"/>

<!-- Error Handler -->
<bean id="biErrorHandler" class="my.project.sync.BiErrorHandler" />
<!-- Message converter -->
<bean id="biMessageConverter" class="my.project.sync.BiMessageConverter"/>

<bean id="retryInterceptor" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
    <property name="retryOperations" ref="retryTemplate" />
    <property name="messageKeyGenerator" ref="biKeyGenerator" />
</bean> 

<bean id="biKeyGenerator" class="my.project.sync.BiMessageKeyGenerator"/>
<bean id="rejectAndDontRequeueRecoverer" class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="3000" />
            <property name="maxInterval" value="30000" />
        </bean>     
    </property>
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="3" />
        </bean>     
    </property>
</bean>  

 <rabbit:queue id="biSynchronizationQueue" name="BI_SYNCHRONIZATION_QUEUE" durable="true" />
<rabbit:listener-container message-converter="biMessageConverter" concurrency="1" 
    connection-factory="connectionFactory"
    error-handler="biErrorHandler"
    advice-chain="retryInterceptor"
    acknowledge="auto">
    <rabbit:listener queues="BI_SYNCHRONIZATION_QUEUE" ref="biSynchronizationService" method="handleMessage"/>
</rabbit:listener-container>

我想在第三次尝试后将消息重新排入队列尾部。但我找不到执行此操作的方法。

有人有想法吗?

在此先感谢您的帮助。

4

2 回答 2

7

不使用RejectAndDontRequeueRecoverer,而是使用使用 a 的自定义MessageRecovererRabbitTemplate消息发送到队列的后面;然后抛出一个AmqpRejectAndDontRequeueException这样的消息将被拒绝。

您可以将 子类化RejectAndDontRequeueRecoverer,发送消息recover()然后调用super.recover()(这只是引发异常)。或者只是在您自己的recover().

于 2014-02-04T16:01:39.663 回答
3

我使用了这个技巧,但后来使用了一个死信队列,朝着稍微不同的方向发展。对我来说,这比将消息放在队列末尾更明确。

 <rabbit:queue name="content.variantchange.queue" durable="true" queue-arguments="queueArguments"/>

<util:map id="queueArguments">
    <entry key="x-dead-letter-exchange" value="content.deadletter.topic"/>
</util:map>

  <rabbit:fanout-exchange name="content.deadletter.topic">
    <rabbit:bindings>
        <rabbit:binding queue="content.deadletter.queue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<rabbit:queue name="content.deadletter.queue" durable="true"/>

一旦你有了死信队列,你可以通过另一个消费者做任何你想做的事情(包括将它添加回原始队列。)

我最终还使用了 Gary 推荐的无状态版本的拦截器,这让我不必担心消息 id 生成器。

<bean id="retryInterceptor" class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
    <property name="retryOperations" ref="retryTemplate" />
</bean>
于 2014-03-27T09:08:26.670 回答