2

在处理来自 RabbitMQ 的消息时出现异常,我只想取消确认并将特定消息放回差异队列,或者重新排队到同一个队列或完全丢弃消息(根据 basicNack 中的最后一个布尔标志@requeue)。

整个想法是稍后我可以获得未确认消息的计数并检查消息格式等,而不是一次又一次地重新排队到同一个频道,而且我想将未确认的信号发送到当前频道。

仅供参考,我将通道确认模式设置为手动(即 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);)

这就是我现在正在做的事情。

public class My***Listener implements ChannelAwareMessageListener{

try{

    @Override
    public void onMessage(Message message,Channel channel) throws Exception {   
    String s = new String(message.getBody());
    //some logic
    //after successful ack manually
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
catch(Exception e){
      //currently on exception i am unack the channel
      channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}

任何帮助都是非常可观的。

4

3 回答 3

4

您可以将它们发送到死信队列。这是一个非常标准的模式。

https://www.rabbitmq.com/dlx.html

于 2014-08-18T08:04:46.050 回答
3

你需要这样的东西:

@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .withMaxAttempts(5)
            .setRecoverer(new RepublishMessageRecoverer(amqpTemplate(), "bar", "baz"))
            .build();
}

注意:它仅适用于spring-amqp 1.3+ 另见参考

于 2014-08-18T08:57:10.357 回答
1

如果您更喜欢使用声明,请参阅此答案以获取示例。

要将消息路由到DLX,您可以设置defaultRequeuRejectedfalse(在侦听器容器上)。或者你可以抛出一个AmqpRejectAndDontRequeueException告诉容器你希望这条消息被拒绝(而不是重新排队)。

容器的默认行为是将被拒绝的消息重新排队。

您可以使用带有 a 的重试拦截器RejectAndDontRequeueRecoverer来自动抛出异常;或者,正如@Jawo99 所说,您可以使用重新发布恢复器 - 这具有将堆栈跟踪添加为标题的额外好处。DLX 只是路由原始消息。

于 2014-08-18T12:41:42.630 回答