1

我有一个IntegrationFlow这样定义的弹簧集成:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean)
                    .get();

serviceActivatorBean看起来像这样:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator

    public void myMethod(Collection<MyEvent> events) {
        ....
    }    
}

如果myMethod抛出异常,它将被记录但不会重试。我试图将其更改IntegrationFlow为:

RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(5);
retryTemplate.setRetryPolicy(retryPolicy);
advice.setRetryTemplate(retryTemplate);

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .adviceChain(advice)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean)
                    .get();

但后来我收到这样的日志消息(不会发生重试):

2017-06-30 13:18:10.611 WARN 88706 --- [erContainer#1-2] osihaRequestHandlerRetryAdvice:此建议 org.springframework.integration.handler.advice.RequestHandlerRetryAdvice 只能用于 MessageHandlers;在“org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1”中建议方法“invokeListener”的尝试被忽略

我怎样才能将其配置IntegrationFlow为与 a 一样的行为方式RabbitListener?即让RabbitMQ 再次发布消息。

4

1 回答 1

1

如消息所述,在适配器的建议链中使用重试拦截器RequestHandlerRetryAdvice,而不是用于使用端点的 - 。

于 2017-06-30T13:16:24.097 回答