1

我想使用 spring-integration-aws 向 AWS SNS 发送消息并从 AWS SQS 接收消息。我在理解如何将我的应用程序从 spring-coud-aws-messaging 迁移到使用它时遇到了一些麻烦。我的代码基本上是一个 SQS 配置类:

@Configuration
@EnableConfigurationProperties(SqsProperties.class)
@Profile("!test")
public class SqsConfiguration {

     private final SqsProperties sqsProperties;  

     @Autowired  
     public SqsConfiguration(SqsProperties sqsProperties) {  
          this.sqsProperties = sqsProperties;  
     }  

     @Bean  
     public SimpleMessageListenerContainer simpleMessageListenerContainer() {  
          SimpleMessageListenerContainer msgListenerContainer =  
                simpleMessageListenerContainerFactory().createSimpleMessageListenerContainer();  
          msgListenerContainer.setMessageHandler(queueMessageHandler());  
          return msgListenerContainer;  
     }  

     @Bean  
     public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {  
          SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory();  
      msgListenerContainerFactory.setAmazonSqs(amazonSQSClient());  
      msgListenerContainerFactory.setDestinationResolver(customDestinationResolver(sqsProperties));  
     return msgListenerContainerFactory;  
      }  

        @Bean  
      public QueueMessageHandler queueMessageHandler() {  
            QueueMessageHandlerFactory queueMsgHandlerFactory = new QueueMessageHandlerFactory();  
      queueMsgHandlerFactory.setAmazonSqs(amazonSQSClient());  
     return queueMsgHandlerFactory.createQueueMessageHandler();  
      }  


        @Bean(name = "amazonSQS", destroyMethod = "shutdown")  
        public AmazonSQSAsync amazonSQSClient() {  
            return AmazonSQSAsyncClientBuilder.defaultClient();  
      }  

        @Bean  
      public SqsQueuesDestinationResolver customDestinationResolver(SqsProperties sqsProperties) {  
            return new SqsQueuesDestinationResolver(sqsProperties);  
      }  
    }

使用 customDestinationResolver (基本上是因为队列名称只能由于不同的部署而动态)定义为:

    public class SqsQueuesDestinationResolver implements DestinationResolver<String> {  

        private static final Logger LOG = LoggerFactory.getLogger(SqsQueuesDestinationResolver.class);  

     private SqsProperties sqsProperties;  

      @Autowired  
      private AmazonSQSAsync amazonSQSAsync;  

     public SqsQueuesDestinationResolver(SqsProperties sqsProperties) {  
            this.sqsProperties = sqsProperties;  
      }  

        @Override  
      public String resolveDestination(String queueName) throws DestinationResolutionException {  
            String finalQueueName = getFinalQueueName(queueName);  
     try {  
                return amazonSQSAsync.getQueueUrl(finalQueueName).getQueueUrl();  
      }  
            catch (QueueDoesNotExistException queueDoesNotExistException) {  
                LOG.error(String.format("The '%s' queue was not found.", finalQueueName));  
      }  
            return null;  
      }  

        private String getFinalQueueName(String queueName) {  
            String finalQueueName;  
     switch (queueName) {  
                case "queue-foo":  
                    finalQueueName = sqsProperties.getFooQueue();  
     break; case "queue-bar":  
                    finalQueueName = sqsProperties.getBarQueue();  
     break;  
     default:  
                    finalQueueName = null;  
      }  
            return finalQueueName;  
      }  
    }

仅此而已:基本上使用此配置,我只需要使用注释@SqsListener("foo-queue")@SqsListener("bar-queue")适当的使用者方法

@SqsListener("foo-queue")
public void listen(String message){
     processMessage(message);
}

我一直在尝试遵循https://github.com/spring-projects/spring-integration-aws#spring-integrations-extensions-to-aws中的文档,而我在 SQS 部分中难以理解的内容是“入站通道适配器”一章我应该将哪些队列作为参数提供给 SqsMessageDrivenChannelAdapter() 的构造函数,因为我使用的是自定义destinationResolver,以及我究竟如何使用这些消息,或者它是否应该与 Spring 的 @SqsListener 注释一起使用-cloud-aws-messaging 和以前一样。

非常感谢您的帮助,如果这不是问的正确地方,或者这是一个非常愚蠢的问题,我很抱歉,我只是第一次尝试这个:)

4

1 回答 1

0

我不确定你缺少什么,但你仍然可以在ctor中使用你的foo-queueand 。bar-queueSqsMessageDrivenChannelAdapter(AmazonSQSAsync amazonSqs, String... queues)

您的自定义DestinationResolver确实也可以在那里使用:

public void setDestinationResolver(DestinationResolver<String> destinationResolver) {
    this.simpleMessageListenerContainerFactory.setDestinationResolver(destinationResolver);
}

其实这SqsMessageDrivenChannelAdapter完全是基于相同的SimpleMessageListenerContainerFactory

要使用从 SQS 接收到的消息,您只需使用 a 配置此通道适配器MessageChannel以生成消息。然后你订阅下游的那个频道。

在 Spring Integration 参考手册中查看有关通道和消息的更多信息:https ://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/index.html

于 2020-04-08T20:11:43.363 回答