1

我需要对所有 jms 侦听器应用一些预检查和常见步骤,例如根据模式(JSON 模式)验证原始消息。例子 -

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(Order order) { ... }
}

现在,在 spring 将 Message 从队列转换为 Order 之前,我需要执行以下操作 -

  1. 将带有标题的原始消息记录到自定义记录器中。
  2. 根据 json 模式验证 json 消息(文本消息)(为了简单起见,假设我这里只有一个模式)
  3. 如果架构验证失败,记录错误并抛出异常
  4. 如果 schema 验证通过,继续控制 spring 进行转换并继续处理 order 方法。

Spring JMS 架构是否提供了任何方式来注入上述需求?我知道 AOP 会出现,但我不确定它是否可以与 @JmsListener 一起使用。

4

1 回答 1

1

一个相当简单的技术是autoStartupfalse侦听器容器工厂上设置。

然后,使用JmsListenerEndpointRegistrybean 获取侦听器容器。

然后getMessageListener(),将其包装在 AOP 代理中,然后setMessageListener().

然后启动容器。

可能有一种更优雅的方式,但我认为您必须深入了解侦听器创建代码的内容,这非常复杂。

编辑

Spring Boot 示例:

@SpringBootApplication
public class So49682934Application {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) {
        SpringApplication.run(So49682934Application.class, args);
    }

    @JmsListener(id = "listener1", destination = "so49682934")
    public void listen(Foo foo) {
        logger.info(foo.toString());
    }

    @Bean
    public ApplicationRunner runner(JmsListenerEndpointRegistry registry, JmsTemplate template) {
        return args -> {
            DefaultMessageListenerContainer container =
                    (DefaultMessageListenerContainer) registry.getListenerContainer("listener1");
            Object listener = container.getMessageListener();
            ProxyFactory pf = new ProxyFactory(listener);
            NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(new MyJmsInterceptor());
            advisor.addMethodName("onMessage");
            pf.addAdvisor(advisor);
            container.setMessageListener(pf.getProxy());
            registry.start();
            Thread.sleep(5_000);
            Foo foo = new Foo("baz");
            template.convertAndSend("so49682934", foo);
        };
    }

    @Bean
    public MessageConverter converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("typeId");
        return converter;
    }

    public static class MyJmsInterceptor implements MethodInterceptor {

        private final Logger logger = LoggerFactory.getLogger(getClass());

        @Override
        public Object invoke(MethodInvocation invocation) throws Throwable {
            Message message = (Message) invocation.getArguments()[0];
            logger.info(message.toString());
            // validate
            return invocation.proceed();
        }

    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

spring.jms.listener.auto-startup=false

m2018-04-06 11:42:04.859 INFO 59745 --- [enerContainer-1] e.So49682934Application$MyJmsInterceptor : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-60138-1523029319662-4: 2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-60138-1523029319662-4:2:1:1, destination = queue://so49682934, transactionId = null, expire = 0,时间戳 = 1523029324849,到达 = 0,brokerInTime = 1523029324849,brokerOutTime = 1523029324853,correlationId = null,replyTo = null,persistent = true,type = null,priority = 4,groupID = null,groupSequence = 0,targetConsumerId = null,压缩 = false,用户 ID = null,内容 = null,marshalledProperties = null,dataStructure = null,redeliveryCounter = 0,大小 = 1050,属性 = {typeId=com.example.So49682934Application$Foo},readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"bar":"baz"}}

2018-04-06 11:42:04.882 信息 59745 --- [enerContainer-1] ication$$EnhancerBySpringCGLIB$$e29327b8: Foo [bar=baz]

编辑2

以下是如何通过基础设施来做到这一点......

@SpringBootApplication
@EnableJms
public class So496829341Application {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) {
        SpringApplication.run(So496829341Application.class, args);
    }

    @JmsListener(id = "listen1", destination="so496829341")
    public void listen(Foo foo) {
        logger.info(foo.toString());
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            Thread.sleep(5_000);
            template.convertAndSend("so496829341", new Foo("baz"));
        };
    }

    @Bean
    public MessageConverter converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("typeId");
        return converter;
    }

    @Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    public static JmsListenerAnnotationBeanPostProcessor bpp() {
        return new JmsListenerAnnotationBeanPostProcessor() {

            @Override
            protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
                return new MethodJmsListenerEndpoint() {

                    @Override
                    protected MessagingMessageListenerAdapter createMessageListener(
                            MessageListenerContainer container) {
                        MessagingMessageListenerAdapter listener = super.createMessageListener(container);
                        ProxyFactory pf = new ProxyFactory(listener);
                        pf.setProxyTargetClass(true);
                        NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(new MyJmsInterceptor());
                        advisor.addMethodName("onMessage");
                        pf.addAdvisor(advisor);
                        return (MessagingMessageListenerAdapter) pf.getProxy();
                    }

                };
            }

        };
    }

    public static class MyJmsInterceptor implements MethodInterceptor {

        private final Logger logger = LoggerFactory.getLogger(getClass());

        @Override
        public Object invoke(MethodInvocation invocation) throws Throwable {
            Message message = (Message) invocation.getArguments()[0];
            logger.info(message.toString());
            // validate
            return invocation.proceed();
        }

    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

注意:BPP 必须是静态的并且@EnableJms是必需的,因为存在此 BPP 会禁用引导。

2018-04-06 13:44:41.607 信息 82669 --- [enerContainer-1] .So496829341Application$MyJmsInterceptor : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-63685-1523036676402-4:2 :1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-63685-1523036676402-4:2:1:1, destination = queue://so496829341, transactionId = null, expiration = 0 ,时间戳 = 1523036681598,到达 = 0,brokerInTime = 1523036681598,brokerOutTime = 1523036681602,correlationId = null,replyTo = null,persistent = true,type = null,priority = 4,groupID = null,groupSequence = 0,targetConsumerId = null,压缩= false,userID = null,content = null,marshalledProperties = null,dataStructure = null,redeliveryCounter = 0,size = 1050,properties = {typeId=com.example.So496829341Application$Foo},readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"bar":"baz"}}

2018-04-06 13:44:41.634 信息 82669 --- [enerContainer-1] ication$$EnhancerBySpringCGLIB$$9ff4b13f : Foo [bar=baz]

编辑3

避免 AOP...

@SpringBootApplication
@EnableJms
public class So496829341Application {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    public static void main(String[] args) {
        SpringApplication.run(So496829341Application.class, args);
    }

    @JmsListener(id = "listen1", destination="so496829341")
    public void listen(Foo foo) {
        logger.info(foo.toString());
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            Thread.sleep(5_000);
            template.convertAndSend("so496829341", new Foo("baz"));
        };
    }

    @Bean
    public MessageConverter converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("typeId");
        return converter;
    }

    @Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    public static JmsListenerAnnotationBeanPostProcessor bpp() {
        return new JmsListenerAnnotationBeanPostProcessor() {

            @Override
            protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
                return new MethodJmsListenerEndpoint() {

                    @Override
                    protected MessagingMessageListenerAdapter createMessageListener(
                            MessageListenerContainer container) {
                        final MessagingMessageListenerAdapter listener = super.createMessageListener(container);
                        return new MessagingMessageListenerAdapter() {

                            @Override
                            public void onMessage(Message jmsMessage, Session session) throws JMSException {
                                logger.info(jmsMessage.toString());
                                // validate
                                listener.onMessage(jmsMessage, session);
                            }

                        };
                    }

                };
            }

        };
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

编辑4

要访问侦听器方法上的其他注释,可以这样做,但是需要反射来获取对Method...的引用

@JmsListener(id = "listen1", destination="so496829341")
@Schema("foo.bar")
public void listen(Foo foo) {
    logger.info(foo.toString());
}

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Schema {

    String value();

}

@Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public static JmsListenerAnnotationBeanPostProcessor bpp() {
    return new JmsListenerAnnotationBeanPostProcessor() {

        @Override
        protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
            return new MethodJmsListenerEndpoint() {

                @Override
                protected MessagingMessageListenerAdapter createMessageListener(
                        MessageListenerContainer container) {
                    final MessagingMessageListenerAdapter listener = super.createMessageListener(container);
                    InvocableHandlerMethod handlerMethod =
                            (InvocableHandlerMethod) new DirectFieldAccessor(listener)
                                    .getPropertyValue("handlerMethod");
                    final Schema schema = AnnotationUtils.getAnnotation(handlerMethod.getMethod(), Schema.class);
                    return new MessagingMessageListenerAdapter() {

                        @Override
                        public void onMessage(Message jmsMessage, Session session) throws JMSException {
                            logger.info(jmsMessage.toString());
                            logger.info(schema.value());
                            // validate
                            listener.onMessage(jmsMessage, session);
                        }

                    };
                }

            };
        }

    };
}
于 2018-04-06T13:29:24.250 回答