-1

目前我已经设置了我的消息侦听器容器以使用 spring-retry 来处理重试,但是当有人发送消息而不提供消息 ID 时,消息侦听器会停止。可以更改此行为以将消息放入死信队列而不是停止侦听器吗?

我的重试配置如下:

 @Bean
 public StatefulRetryOperationsInterceptor retryInterceptor() {
   StatefulRetryOperationsInterceptorFactoryBean f = new     
   StatefulRetryOperationsInterceptorFactoryBean();
   f.setRetryOperations(retryTemplate());
   f.setMessageRecoverer(new RejectAndDontRequeueRecoverer());

   return f.getObject();
}

private RetryOperations retryTemplate() {
  RetryTemplate retryTemplate = new RetryTemplate();
  ExponentialRandomBackOffPolicy backOffPolicy = new ExponentialRandomBackOffPolicy();

  backOffPolicy.setInitialInterval(50);
  backOffPolicy.setMultiplier(1.5);
  backOffPolicy.setMaxInterval(1000);
  retryTemplate.setBackOffPolicy(backOffPolicy);

  SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
  retryPolicy.setMaxAttempts(10);
  retryTemplate.setRetryPolicy(retryPolicy);

  return retryTemplate;
}

我得到以下异常:

2014-08-01 08:50:27,858 [taskExecutor<OmittedForPrivacy>-2] WARN  mqp.rabbit.listener.SimpleMessageListenerContainer - Execution of Rabbit message listener failed, and no ErrorHandler has been set.
org.springframework.amqp.rabbit.listener.FatalListenerExecutionException: Illegal null id in message. Failed to manage retry for message: (Body:'{
    <Omitted for privacy>
}'; ID:null; Content:application/json; Headers:{__TypeId__=<OmittedForPrivacy>}; Exchange:; RoutingKey:<OmittedForPrivacy>; Reply:null; DeliveryMode:NON_PERSISTENT; DeliveryTag:1)
    at org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean$3.getKey(StatefulRetryOperationsInterceptorFactoryBean.java:114) ~[spring-rabbit-1.2.1.RELEASE.jar:na]
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:132) ~[spring-retry-1.1.0.RELEASE.jar:na]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.0.0.RELEASE.jar:4.0.0.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) ~[spring-aop-4.0.0.RELEASE.jar:4.0.0.RELEASE]
    at com.sun.proxy.$Proxy612.invokeListener(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:620) [spring-rabbit-1.2.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:454) ~[spring-rabbit-1.2.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:480) [spring-rabbit-1.2.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:464) [spring-rabbit-1.2.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:61) [spring-rabbit-1.2.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:558) [spring-rabbit-1.2.1.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_17]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_17]
    at java.lang.Thread.run(Thread.java:722) [na:1.7.0_17]

所以我想改变这样一个事实,即他之后停止消息侦听器容器,而只是将消息放在死信队列上。

克格兹

4

1 回答 1

0

听众“停止”是什么意思?请显示您的配置并在某处发布显示您描述的行为的调试日志。

编辑:

啊——AmqpRejectAndDontRequeueRecover就是在监听异常后恢复;这个异常发生在我们进入重试逻辑之前。

您可以将 a 添加MissingMessageIdAdvice到建议链(在重试建议之前)。

它将允许尝试缺少 id 的消息,但RejectAndDontRequeueException如果重新传递(并再次失败)将抛出一个 - 仅重试一次,无论重试设置如何。

如果您想完全拒绝此类消息,则必须创建自己的建议。

于 2014-07-31T14:28:08.040 回答