5

我有一个 Spring Boot 应用程序并希望接收来自多个 AWS SQS 队列的消息。这些队列都有自己的凭据(遗憾的是我对此无能为力)。这些凭据都不能访问其他队列之一,它们都仅限于一个队列。

只有一个队列和凭证,这很简单。我只需将凭据提供为AWSCredentialsProviderBean 并用@SqsListener\注释我的方法@EnableSqs
但我无法弄清楚如何使用多个凭据来做到这一点。

@SqsListener注释无法提供凭据、预配置对象AmazonSqs或任何其他有帮助的东西。

CredentialsProvider我通过扩展或客户端搜索了一种将队列映射到凭据的方法AmazonSqs,但无济于事。
我什至尝试在 AmazonHttpClient 的标头中注入凭据,但这也是不可能的。

我试图创建手动收听 SQS 队列所需的一切。但我坚持为SimpleMessageListenerContainer.
requiredQueueMessageHandler仅在创建为具有应用程序上下文的 bean 时才有效。否则它不会查找带有注释的方法@SqsListener
可悲的是,我能找到的唯一教程或示例要么使用我想避免的 JMS,要么仅使用@SqsListener带有一个队列的注释。

有没有其他方法可以为多个队列提供不同的凭据?

我的测试代码:

@Component
@Slf4j
public class TestOneQueueA {

  public static final String QUEUE_A = "TestOneQueueA";

  public TestOneQueueA(Cloud cloud, ResourceIdResolver resourceIdResolver) {
    SqsServiceInfo serviceInfo = (SqsServiceInfo) cloud.getServiceInfo(QUEUE_A);
    AWSStaticCredentialsProvider credentialsProvider =
        new AWSStaticCredentialsProvider(new BasicAWSCredentials(serviceInfo.getAccessKey(),
            serviceInfo.getSecretAccessKey()));

    AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withRegion(serviceInfo.getRegion()).build();

    QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
    queueMessageHandlerFactory.setAmazonSqs(client);
    queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(new MappingJackson2MessageConverter()));

    QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
    queueMessageHandler.afterPropertiesSet(); // won't do anything because of no ApplicationContext

    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(client);
    factory.setResourceIdResolver(resourceIdResolver);
    factory.setQueueMessageHandler(queueMessageHandler);

    SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    try {
      simpleMessageListenerContainer.afterPropertiesSet();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    simpleMessageListenerContainer.start();
    simpleMessageListenerContainer.start(QUEUE_A); // fails with "Queue with name 'TestOneQueueA' does not exist"
  }

  @SqsListener(value = QUEUE_A, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
  public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
    log.info("Received SQS Message: \nSubject: %s \n%s", subject, dto);
  }
}

编辑:

在尝试了更多之后,我能够将我的 AmazonSQS 客户端注入两个单独的SimpleMessageListenerContainer. 然后问题就变成了QueueMessageHandler

如果我在没有 bean 上下文的情况下手动创建它,它根本不会查找带有@SqsListener注释的任何方法。并且无法手动设置处理程序。
如果我将它创建为 bean,它将查看每个 bean 的注释。所以它也会找到它不应该寻找的队列的方法。然后它会崩溃,因为凭据不起作用。
我想不出一种方法来QueueMessageHandler只为单个 SqsListener 方法创建一个。除了.
_SimpleMessageListenerContainerQueueMessageHandler

4

2 回答 2

4

在花了一些时间寻找更好的解决方案之后,我坚持以下几点:

package test;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.AwsRegionProvider;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import test.TestDto;
import test.CustomQueueMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationSubject;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;

@Component
public class TestQueue {

  private static final String QUEUE_NAME = "TestQueue";
  private static final Logger log = LoggerFactory.getLogger(TestQueue.class);

  public TestQueue(AWSCredentialsProvider credentialsProvider, AwsRegionProvider regionProvider) {
    AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withRegion(regionProvider.getRegion())
        .build();

    // custom QueueMessageHandler to initialize only this queue
    CustomQueueMessageHandler queueMessageHandler = new CustomQueueMessageHandler();
    queueMessageHandler.init(this);
    queueMessageHandler.afterPropertiesSet();

    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(client);
    factory.setQueueMessageHandler(queueMessageHandler);

    SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    try {
      simpleMessageListenerContainer.afterPropertiesSet();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    simpleMessageListenerContainer.start();
  }

  @SqsListener(value = QUEUE_NAME, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
  public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
    log.info("Received SQS Message: \nSubject: {} \n{}", subject, dto);
  }
}

和习俗QueueMessageHandler

package test;

import java.util.Collections;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;

public class CustomQueueMessageHandler extends QueueMessageHandler {

  public void init(Object handler) {
    detectHandlerMethods(handler);
  }
}

的唯一目的CustomQueueMessageHandler是传递应该扫描 SQS 注释的单个对象。由于我没有使用 Spring Context 启动它,因此它不会在每个 bean 中搜索@SqsListener注释。但是所有的初始化都隐藏在受保护的方法后面。这就是为什么我需要覆盖类,以访问那些 init 方法。

我不认为这是一个非常优雅的解决方案,手动创建所有 AWS 客户端内容,并调用 bean init 方法。但这是我能找到的唯一一个仍然可以访问 AWS SQS 库的所有功能的解决方案,例如转换传入消息和通知、删除策略、队列轮询(包括故障处理)等。

于 2019-09-25T09:04:48.567 回答
1

@Bean您可以为您希望与 custom 一起使用的帐户声明不同的 s @Qualifier。假设您在两个不同的帐户中有两个 SQS 队列。然后声明两个 bean 类型AmazonSQS

@Qualifier("first")
@Bean public AmazonSQS amazonSQSClient() {
    return AmazonSQSClient.builder()
            .withCredentials(credentialsProvider())
            .withRegion(Regions.US_EAST_1)
            .build();
}

@Qualifier("second")
@Bean public AmazonSQS amazonSQSClient() {
    return AmazonSQSClient.builder()
            .withCredentials(anotherCredentialsProvider())
            .withRegion(Regions.US_EAST_1)
            .build();
}

然后在您的服务中,您可以@Autowired为他们服务。

@Autowired @Qualifier("second") private AmazonSQS sqsSecond;
于 2019-09-23T14:43:32.850 回答