0

我正在尝试将 Spring 与 RabbitMQ 集成,使用 RabbitMQ 支持的 Spring 集成通道。(由于某种原因,这似乎几乎没有记录,这是新的吗?)。

为此,我似乎可以使用 AmqpChannelFactoryBean 创建一个频道。要设置消息转换,我使用 Jackson2JsonMessageConverter。

当我使用带有 POJO 有效负载的 GenericMessage 时,它​​拒绝从 Java 反序列化它,主要是因为它不知道类型。我本来希望该类型会自动放在标题上,但标题上只有__TypeId__=org.springframework.messaging.support.GenericMessage.

在 Spring Boot 中,我的配置类如下所示:

@Configuration
public class IntegrationConfiguration {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpChannelFactoryBean myActivateOutChannel(CachingConnectionFactory connectionFactory,
        MessageConverter messageConverter) {

        AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
        factoryBean.setConnectionFactory(connectionFactory);
        factoryBean.setQueueName("myActivateOut");
        factoryBean.setPubSub(false);
        factoryBean.setAcknowledgeMode(AcknowledgeMode.AUTO);
        factoryBean.setDefaultDeliveryMode(MessageDeliveryMode.PERSISTENT);
        factoryBean.setMessageConverter(messageConverter);
        return factoryBean;
    }

    @Bean
    @ServiceActivator(inputChannel = "bsnkActivateOutChannel", autoStartup="true")
    public MessageHandler mqttOutbound() {

        return m -> System.out.println(m);
    }

}

发送是这样完成的:

private final MessageChannel myActivateOutChannel;

@Autowired
public MySender(MessageChannel myActivateOutChannel) {
    this.myActivateOutChannel = myActivateOutChannel;
}

@Override
public void run(ApplicationArguments args) throws Exception {
    MyPojo pojo = new MyPojo();
    Message<MyPojo> msg = new GenericMessage<>(pojo);

    myActivateOutChannel.send(msg);
}

如果我设置自己的类映射器,事情就会正常工作。但是如果我设置这样的东西,我将不得不使用许多 MessageConverters。例如

    converter.setClassMapper(new ClassMapper() {

        @Override
        public void fromClass(Class< ? > clazz, MessageProperties properties) {
        }

        @Override
        public Class< ? > toClass(MessageProperties properties) {
            return MyPojo.class;
        }

    });

我用错了吗?我错过了一些配置吗?还有其他建议吗?

谢谢!!:)

注意:看更多的东西,我猜“Spring 集成”的方式是在每一侧添加一个 Spring 集成 JSON 转换器,这意味着每个 RabbitMQ 队列还添加两个额外的直接通道?这对我来说感觉不对,因为那时我有三倍的通道(6!用于输入/输出),但也许这就是应该使用框架的方式?将所有简单的步骤与直接渠道相结合?(在这种情况下,我是否保留了 RabbitMQ 通道提供的持久性?或者如果我想要的话,我是否需要一些事务机制?或者它是直接通道如何工作所固有的?)

我还注意到现在有一​​个 Spring-integration MessageConverter 和一个 Spring-amqp MessageConverter。后者是我用过的。另一个会按照我想要的方式工作吗?快速浏览一下代码表明它没有将对象类型存储在消息头中?

4

1 回答 1

1

在 4.3 版本之前,amqp 支持的通道仅支持可序列化的有效负载;解决方法是改用通道适配器(支持映射)。

INT-3975引入了一个新属性extractPayload,该属性导致消息标头映射到 rabbitmq 标头,并且消息正文只是有效负载而不是序列化的GenericMessage.

设置extractPayload为 true 应该可以解决您的问题。

于 2017-10-10T13:35:11.030 回答