3

我正在将 Spring Integration 与 AWS SQS 队列集成。

当我用注释的方法 @ServiceActivator引发异常时,我遇到了问题。在这种情况下,消息似乎无论如何都会从队列中删除。我已配置MessageDeletionPolicyON_SUCCESSin SqsMessageDrivenChannelAdapter

这是我的频道/适配器配置 https://github.com/sdusza1/spring-integration-sqs/blob/master/src/main/java/com/example/demo/ChannelConfig.java

我已经尝试使用@SqsListener注释做同样的事情,并且消息没有按预期删除。

我在这里创建了一个迷你 Spring Boot 应用程序来演示这个问题: https ://github.com/sdusza1/spring-integration-sqs

请帮忙 :)

4

1 回答 1

4

你的配置是这样的:

@Bean
public MessageProducerSupport sqsMessageDrivenChannelAdapter() {
    SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSqs, SQS_QUEUE_NAME);
    adapter.setOutputChannel(inboundChannel());
    adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
    adapter.setVisibilityTimeout(RETRY_NOTIFICATION_AFTER);
    return adapter;

}

是这样的inboundChannel

 @Bean
    public QueueChannel inboundChannel() {
        return new QueueChannel();
 }

因此,这是一个队列,因此异步和来自该队列的消息在一个单独的线程上处理,该线程根据您的配置TaskScheduler轮询这种通道。PollerMetadata在这种情况下,消费者中的任何错误也会被抛出到该线程中,并且不会达到SqsMessageDrivenChannelAdapter预期的错误处理。

这在技术上与您真正在容器线程上直接调用的体验完全不同@SqsListener,因此应用了它的错误处理。

或者您需要修改您的逻辑,您希望如何处理该单独线程中的错误,或者只是不使用 a QueueChanneljust afterSqsMessageDrivenChannelAdapter并让它在底层 SQS 侦听器容器中抛出和处理错误,因为它是@SqsListener.

于 2019-04-04T16:51:17.280 回答