2

我正在尝试使用 SQS 消费者配置 SNS 生产者,并使用 Spring Boot 应用程序构建。我曾尝试利用 Spring Cloud 并仅使用 JMS API,这两种方式我似乎都无法让我的消费者将消息转换为我阅读。使用 Spring cloud,我收到一条错误消息,指出“有效负载不是有效的通知。下面的代码:非常感谢任何帮助

社交网络配置:

@Bean
public AmazonSNS snsClient() {
     AmazonSNS snsClient = AmazonSNSClientBuilder.standard()
          .withEndpointConfiguration(<endpoint-stuff)).build();
    return snsClient;
}

@Bean
public NotificationMessagingTemplate notificationMessagingTemplate(
        AmazonSNS amazonSNS) {
    return new NotificationMessagingTemplate(amazonSNS);
}

生产者代码:

@Autowired
NotificationMessagingTemplate messagingTemplate;

public void send() throws Exception {

    MessageDto message = new MessageDto();
    message.setMessageA("Hello");
    message.setMessageB("Again");

    messagingTemplate.sendNotification("my_topic",message,"Test");    
}

SQS 消费者配置:

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() 
 {
    SimpleMessageListenerContainer msgListenerContainer = 
     simpleMessageListenerContainerFactory()
            .createSimpleMessageListenerContainer();
    msgListenerContainer.setMessageHandler(queueMessageHandler());
    return msgListenerContainer;
}

@Bean
public SimpleMessageListenerContainerFactory 
    simpleMessageListenerContainerFactory() {
    SimpleMessageListenerContainerFactory msgListenerContainerFactory = 
    new SimpleMessageListenerContainerFactory();
    msgListenerContainerFactory.setAmazonSqs(amazonSQSClient());
    return msgListenerContainerFactory;
}


@Bean
public QueueMessageHandler queueMessageHandler() {
    QueueMessageHandlerFactory queueMsgHandlerFactory = new 
    QueueMessageHandlerFactory();
    queueMsgHandlerFactory.setAmazonSqs(amazonSQSClient());
    QueueMessageHandler queueMessageHandler = 
      queueMsgHandlerFactory.createQueueMessageHandler();
    //List<HandlerMethodArgumentResolver> list = new ArrayList<>();
    //HandlerMethodArgumentResolver resolver = new 
    //PayloadArgumentResolver(new MappingJackson2MessageConverter());
    //list.add(resolver);
    //queueMessageHandler.setArgumentResolvers(list);
    return queueMessageHandler;


}




@Lazy
@Bean(name = "amazonSQS", destroyMethod = "shutdown")
public AmazonSQSAsync amazonSQSClient() {
    AwsClientBuilder.EndpointConfiguration endpointConfiguration = new 
    AwsClientBuilder.EndpointConfiguration(
            "<url>, "<region");
    AmazonSQSAsync awsSQSAsync = 

    AmazonSQSAsyncClientBuilder.standard()
    .withEndpointConfiguration(endpointConfiguration)
           .build();
    return awsSQSAsync;
}

SQS 监听器:

@SqsListener(value = "my_queue", deletionPolicy = 
 SqsMessageDeletionPolicy.ON_SUCCESS)
public void consume(final @NotificationMessage MessageDto event) {
    LOGGER.info("Message received {} 
         {}",event.getMessageA(),event.getMessageB());
}

堆栈跟踪:

Caused by: 
org.springframework.messaging.converter.MessageConversionException: 
Payload: '<payload here>' is not a valid notification
at 
org.springframework.cloud.aws.messaging.support.
converter.NotificationRequestConverter.fromMessage
(NotificationRequestConverter.java:69)
at
4

0 回答 0