我正在尝试将 Spring 与 RabbitMQ 集成,使用 RabbitMQ 支持的 Spring 集成通道。(由于某种原因,这似乎几乎没有记录,这是新的吗?)。
为此,我似乎可以使用 AmqpChannelFactoryBean 创建一个频道。要设置消息转换,我使用 Jackson2JsonMessageConverter。
当我使用带有 POJO 有效负载的 GenericMessage 时,它拒绝从 Java 反序列化它,主要是因为它不知道类型。我本来希望该类型会自动放在标题上,但标题上只有__TypeId__=org.springframework.messaging.support.GenericMessage
.
在 Spring Boot 中,我的配置类如下所示:
@Configuration
public class IntegrationConfiguration {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpChannelFactoryBean myActivateOutChannel(CachingConnectionFactory connectionFactory,
MessageConverter messageConverter) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("myActivateOut");
factoryBean.setPubSub(false);
factoryBean.setAcknowledgeMode(AcknowledgeMode.AUTO);
factoryBean.setDefaultDeliveryMode(MessageDeliveryMode.PERSISTENT);
factoryBean.setMessageConverter(messageConverter);
return factoryBean;
}
@Bean
@ServiceActivator(inputChannel = "bsnkActivateOutChannel", autoStartup="true")
public MessageHandler mqttOutbound() {
return m -> System.out.println(m);
}
}
发送是这样完成的:
private final MessageChannel myActivateOutChannel;
@Autowired
public MySender(MessageChannel myActivateOutChannel) {
this.myActivateOutChannel = myActivateOutChannel;
}
@Override
public void run(ApplicationArguments args) throws Exception {
MyPojo pojo = new MyPojo();
Message<MyPojo> msg = new GenericMessage<>(pojo);
myActivateOutChannel.send(msg);
}
如果我设置自己的类映射器,事情就会正常工作。但是如果我设置这样的东西,我将不得不使用许多 MessageConverters。例如
converter.setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class< ? > clazz, MessageProperties properties) {
}
@Override
public Class< ? > toClass(MessageProperties properties) {
return MyPojo.class;
}
});
我用错了吗?我错过了一些配置吗?还有其他建议吗?
谢谢!!:)
注意:看更多的东西,我猜“Spring 集成”的方式是在每一侧添加一个 Spring 集成 JSON 转换器,这意味着每个 RabbitMQ 队列还添加两个额外的直接通道?这对我来说感觉不对,因为那时我有三倍的通道(6!用于输入/输出),但也许这就是应该使用框架的方式?将所有简单的步骤与直接渠道相结合?(在这种情况下,我是否保留了 RabbitMQ 通道提供的持久性?或者如果我想要的话,我是否需要一些事务机制?或者它是直接通道如何工作所固有的?)
我还注意到现在有一个 Spring-integration MessageConverter 和一个 Spring-amqp MessageConverter。后者是我用过的。另一个会按照我想要的方式工作吗?快速浏览一下代码表明它没有将对象类型存储在消息头中?