1

我正在使用spring-boot-1.5.10和 spring-cloud 并使用spring-cloud-starter-aws-messaging。我能够发送和接收消息,但无法获取 SNS 消息属性。任何帮助都会非常显着。请在下面找到代码,

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.19.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.aws.sample</groupId>
    <artifactId>aws</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>aws</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Edgware.SR5</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-aws</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-aws-messaging</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

控制器.java

@RestController
@RequestMapping(value = "/sns")
@AllArgsConstructor
public class SimpleSnsController {

    private NotificationMessagingTemplate notificationMessagingTemplate;

    @PostMapping("/saveEmployee")
    public String save(@RequestBody Employee employee){
        Map<String,Object> headers = new HashMap<>();
        headers.put("subject", "send employee details to sqs");
        headers.put("name","murugan");
        headers.put("traceId","sample");
        //notificationMessagingTemplate.sendNotification("sample-sns", employee, "send employee details to sqs");

        notificationMessagingTemplate.convertAndSend("sample-sns", employee, headers);
        return "success";
    }

    //@SqsListener(value = "sample-queue")
    @SqsListener(value = "${sqs.consumer.name}")
    public void receiveSnsSqs(String message, @NotificationMessage Employee employee) {
        System.out.println("SNS Consumer received the message::"+message);
        System.out.println("SNS Consumer received the notificationMessage::"+employee);
        //Here i would like to get the message attribute
    }
}

收到的输出消息:

{
  "Type" : "Notification",
  "MessageId" : "ba9dab52-aae8-5940-a3e2-ff8c8458ef52",
  "TopicArn" : "arn:aws:sns:XXX",
  "Message" : "{\"name\":\"David\",\"age\":\"31\",\"designation\":\"developer\"}",
  "Timestamp" : "2019-02-13T14:40:48.501Z",
  "SignatureVersion" : "1",
  "Signature" : "XXX",
  "SigningCertURL" : "XXX",
  "UnsubscribeURL" : "XXX",
  "MessageAttributes" : {
    "traceId" : {"Type":"String","Value":"sample"},
    "subject" : {"Type":"String","Value":"send employee details to sqs"},
    "name" : {"Type":"String","Value":"murugan"},
    "id" : {"Type":"String","Value":"68bf17f2-0f88-4cc5-0609-0ccd42b19ce4"},
    "SenderId" : {"Type":"String","Value":"David"},
    "contentType" : {"Type":"String","Value":"application/json;charset=UTF-8"},
    "timestamp" : {"Type":"Number.java.lang.Long","Value":"1550068848349"}
  }
}

我想获取我在 SNS 生产者中设置的消费者中的消息属性,例如 name、traceId。我浏览了很多,但找不到任何解决方案。任何帮助都会非常显着。

4

1 回答 1

2

尝试启用原始消息传递。它不会包装原始SNS消息,并允许您通过普通注解@Header、@Headers获取消息和标头

https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html

如果您不能使用原始消息传递,我制作了一个新注释来帮助检索通知标头

@NotificationHeader

import org.springframework.core.annotation.AliasFor;
import org.springframework.messaging.handler.annotation.ValueConstants;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface NotificationHeader {
    /**
     * Alias for {@link #name}.
     */
    @AliasFor("name")
    String value() default "";

    /**
     * The name of the request header to bind to.
     */
    @AliasFor("value")
    String name() default "";

    /**
     * Whether the header is required.
     * <p>Default is {@code true}, leading to an exception if the header is
     * missing. Switch this to {@code false} if you prefer a {@code null}
     * value in case of a header missing.
     * @see #defaultValue
     */
    boolean required() default true;

    /**
     * The default value to use as a fallback.
     * <p>Supplying a default value implicitly sets {@link #required} to {@code false}.
     */
    String defaultValue() default ValueConstants.DEFAULT_NONE;
}

*NotificationHeaderArgumentResolver

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.cloud.aws.messaging.support.NotificationMessageArgumentResolver;
import org.springframework.core.MethodParameter;
import org.springframework.core.convert.ConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver;
import org.springframework.util.Assert;

public class NotificationHeaderArgumentResolver extends HeaderMethodArgumentResolver {

    private NotificationMessageArgumentResolver notificationArgumentResolver;

    public NotificationHeaderArgumentResolver(ConversionService cs, ConfigurableBeanFactory beanFactory) {
        super(cs, beanFactory);

        notificationArgumentResolver = new NotificationMessageArgumentResolver(new NoOptMessageConverter());
    }

    @Override
    public boolean supportsParameter(MethodParameter parameter) {
        return parameter.hasParameterAnnotation(NotificationHeader.class);
    }

    @Override
    @Nullable
    protected Object resolveArgumentInternal(MethodParameter parameter, Message<?> message, String name)
            throws Exception {

        Message notificationMessage = (Message) notificationArgumentResolver.resolveArgument(parameter, message);

        return super.resolveArgumentInternal(parameter, notificationMessage, name);
    }

    @Override
    protected NamedValueInfo createNamedValueInfo(MethodParameter parameter) {
        NotificationHeader annotation = parameter.getParameterAnnotation(NotificationHeader.class);
        Assert.state(annotation != null, "No Header annotation");
        return new HeaderNamedValueInfo(annotation);
    }

    private static class HeaderNamedValueInfo extends NamedValueInfo {

        private HeaderNamedValueInfo(NotificationHeader annotation) {
            super(annotation.name(), annotation.required(), annotation.defaultValue());
        }
    }

    public static class NoOptMessageConverter implements MessageConverter {
        @Override
        public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers) {
            return null;
        }

        @Override
        public Object fromMessage(Message<?> message, Class<?> targetClass) {
            return message;
        }
    }
}

*通知头配置

    @Bean
    public QueueMessageHandlerFactory queueMessageHandlerFactory() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();

        queueMessageHandlerFactory.setArgumentResolvers(Collections.singletonList(new NotificationHeaderArgumentResolver(null, null)));

        return queueMessageHandlerFactory;
    }
于 2019-04-25T11:16:44.767 回答