我有一个 Spring Boot 应用程序并希望接收来自多个 AWS SQS 队列的消息。这些队列都有自己的凭据(遗憾的是我对此无能为力)。这些凭据都不能访问其他队列之一,它们都仅限于一个队列。
只有一个队列和凭证,这很简单。我只需将凭据提供为AWSCredentialsProvider
Bean 并用@SqsListener
\注释我的方法@EnableSqs
。
但我无法弄清楚如何使用多个凭据来做到这一点。
@SqsListener
注释无法提供凭据、预配置对象AmazonSqs
或任何其他有帮助的东西。
CredentialsProvider
我通过扩展或客户端搜索了一种将队列映射到凭据的方法AmazonSqs
,但无济于事。
我什至尝试在 AmazonHttpClient 的标头中注入凭据,但这也是不可能的。
我试图创建手动收听 SQS 队列所需的一切。但我坚持为SimpleMessageListenerContainer
.
requiredQueueMessageHandler
仅在创建为具有应用程序上下文的 bean 时才有效。否则它不会查找带有注释的方法@SqsListener
。
可悲的是,我能找到的唯一教程或示例要么使用我想避免的 JMS,要么仅使用@SqsListener
带有一个队列的注释。
有没有其他方法可以为多个队列提供不同的凭据?
我的测试代码:
@Component
@Slf4j
public class TestOneQueueA {
public static final String QUEUE_A = "TestOneQueueA";
public TestOneQueueA(Cloud cloud, ResourceIdResolver resourceIdResolver) {
SqsServiceInfo serviceInfo = (SqsServiceInfo) cloud.getServiceInfo(QUEUE_A);
AWSStaticCredentialsProvider credentialsProvider =
new AWSStaticCredentialsProvider(new BasicAWSCredentials(serviceInfo.getAccessKey(),
serviceInfo.getSecretAccessKey()));
AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(serviceInfo.getRegion()).build();
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(client);
queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(new MappingJackson2MessageConverter()));
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
queueMessageHandler.afterPropertiesSet(); // won't do anything because of no ApplicationContext
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(client);
factory.setResourceIdResolver(resourceIdResolver);
factory.setQueueMessageHandler(queueMessageHandler);
SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
try {
simpleMessageListenerContainer.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
simpleMessageListenerContainer.start();
simpleMessageListenerContainer.start(QUEUE_A); // fails with "Queue with name 'TestOneQueueA' does not exist"
}
@SqsListener(value = QUEUE_A, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
log.info("Received SQS Message: \nSubject: %s \n%s", subject, dto);
}
}
编辑:
在尝试了更多之后,我能够将我的 AmazonSQS 客户端注入两个单独的SimpleMessageListenerContainer
. 然后问题就变成了QueueMessageHandler
。
如果我在没有 bean 上下文的情况下手动创建它,它根本不会查找带有@SqsListener
注释的任何方法。并且无法手动设置处理程序。
如果我将它创建为 bean,它将查看每个 bean 的注释。所以它也会找到它不应该寻找的队列的方法。然后它会崩溃,因为凭据不起作用。
我想不出一种方法来QueueMessageHandler
只为单个 SqsListener 方法创建一个。除了.
_SimpleMessageListenerContainer
QueueMessageHandler