我正在尝试使用 SQS 消费者配置 SNS 生产者,并使用 Spring Boot 应用程序构建。我曾尝试利用 Spring Cloud 并仅使用 JMS API,这两种方式我似乎都无法让我的消费者将消息转换为我阅读。使用 Spring cloud,我收到一条错误消息,指出“有效负载不是有效的通知。下面的代码:非常感谢任何帮助
社交网络配置:
@Bean
public AmazonSNS snsClient() {
AmazonSNS snsClient = AmazonSNSClientBuilder.standard()
.withEndpointConfiguration(<endpoint-stuff)).build();
return snsClient;
}
@Bean
public NotificationMessagingTemplate notificationMessagingTemplate(
AmazonSNS amazonSNS) {
return new NotificationMessagingTemplate(amazonSNS);
}
生产者代码:
@Autowired
NotificationMessagingTemplate messagingTemplate;
public void send() throws Exception {
MessageDto message = new MessageDto();
message.setMessageA("Hello");
message.setMessageB("Again");
messagingTemplate.sendNotification("my_topic",message,"Test");
}
SQS 消费者配置:
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer()
{
SimpleMessageListenerContainer msgListenerContainer =
simpleMessageListenerContainerFactory()
.createSimpleMessageListenerContainer();
msgListenerContainer.setMessageHandler(queueMessageHandler());
return msgListenerContainer;
}
@Bean
public SimpleMessageListenerContainerFactory
simpleMessageListenerContainerFactory() {
SimpleMessageListenerContainerFactory msgListenerContainerFactory =
new SimpleMessageListenerContainerFactory();
msgListenerContainerFactory.setAmazonSqs(amazonSQSClient());
return msgListenerContainerFactory;
}
@Bean
public QueueMessageHandler queueMessageHandler() {
QueueMessageHandlerFactory queueMsgHandlerFactory = new
QueueMessageHandlerFactory();
queueMsgHandlerFactory.setAmazonSqs(amazonSQSClient());
QueueMessageHandler queueMessageHandler =
queueMsgHandlerFactory.createQueueMessageHandler();
//List<HandlerMethodArgumentResolver> list = new ArrayList<>();
//HandlerMethodArgumentResolver resolver = new
//PayloadArgumentResolver(new MappingJackson2MessageConverter());
//list.add(resolver);
//queueMessageHandler.setArgumentResolvers(list);
return queueMessageHandler;
}
@Lazy
@Bean(name = "amazonSQS", destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQSClient() {
AwsClientBuilder.EndpointConfiguration endpointConfiguration = new
AwsClientBuilder.EndpointConfiguration(
"<url>, "<region");
AmazonSQSAsync awsSQSAsync =
AmazonSQSAsyncClientBuilder.standard()
.withEndpointConfiguration(endpointConfiguration)
.build();
return awsSQSAsync;
}
SQS 监听器:
@SqsListener(value = "my_queue", deletionPolicy =
SqsMessageDeletionPolicy.ON_SUCCESS)
public void consume(final @NotificationMessage MessageDto event) {
LOGGER.info("Message received {}
{}",event.getMessageA(),event.getMessageB());
}
堆栈跟踪:
Caused by:
org.springframework.messaging.converter.MessageConversionException:
Payload: '<payload here>' is not a valid notification
at
org.springframework.cloud.aws.messaging.support.
converter.NotificationRequestConverter.fromMessage
(NotificationRequestConverter.java:69)
at