我有几个通过 RabbitMQ 使用 Spring Boot 和 RPC 的教程。然而,一旦我尝试添加一个 Jackson JSON 消息转换器,它就崩溃了。
服务器成功接收到远程调用,所以我很有信心这不是客户端配置。
Exchange DATAFLOW_EXCHANGE
Routing Key dataflowRunner
Redelivered ○
Properties
reply_to: amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAAr0wAAAAAB.MmIZ6Htejtc1qB11G7BBQw==
priority: 0
delivery_mode: 2
headers:
__TypeId__: org.springframework.remoting.support.RemoteInvocation
content_encoding: UTF-8
content_type: application/json
Payload
675 bytes
Encoding: string
{"methodName":"run","parameterTypes":["dw.dataflow.Dataflow"],"arguments":[{ Valid Dataflow JSON Removed for Brevity } ]}
但是,会输出以下异常:
Caused by: org.springframework.messaging.converter.MessageConversionException:
No converter found to convert to class dw.dataflow.Dataflow, message=GenericMessage
[payload=RemoteInvocation: method name 'run'; parameter types [dw.dataflow.Dataflow], headers={amqp_receivedExchange=DATAFLOW_EXCHANGE, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAArRAAAAAQC.PA/bJ6lcUfaP3csAP5v5NA==, amqp_consumerQueue=DATAFLOW_QUEUE, amqp_redelivered=false, amqp_receivedRoutingKey=dataflowRunner, amqp_contentEncoding=UTF-8, amqp_deliveryMode=PERSISTENT, id=adb37c77-c0da-16bd-8df4-b739cfddf89f, amqp_consumerTag=amq.ctag-N_tFCc_Hp9UtQkiXl7FZ8g, contentType=application/json, __TypeId__=org.springframework.remoting.support.RemoteInvocation, timestamp=1462560945203}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:118)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:98)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:112)
... 12 common frames omitted
因此,在交付时,它知道它应该是一个 dw.dataflow.Dataflow 对象,它只是找不到转换器。但是,我在任何地方都定义了我的转换器。
服务器配置
@Configuration
@EnableRabbit
public class RabbitListenerConfiguration {
@Autowired
ConnectionFactory connectionFactory;
@Autowired
ObjectMapper jacksonObjectMapper;
@Bean
public TopicExchange exchange() {
return new TopicExchange("DATAFLOW_EXCHANGE", true, false);
}
@Bean
public Queue queue() {
return new Queue("DATAFLOW_QUEUE", true);
}
@Bean
public AmqpInvokerServiceExporter amqpInvokerServiceExporter() {
AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter() ;
exporter.setAmqpTemplate(rabbitTemplate());
exporter.setMessageConverter(jackson2JsonMessageConverter());
exporter.setServiceInterface(DataflowRunner.class);
exporter.setService(dataflowRunner());
return exporter ;
}
@Bean
public DataflowRunner dataflowRunner() {
return new DataflowRunnerServerImpl();
}
@Bean
public MessageConverter jackson2JsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setJsonObjectMapper(jacksonObjectMapper);
return converter;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jackson2JsonMessageConverter());
return template;
}
@Bean(name="rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setDefaultRequeueRejected(false);
return factory;
}
这是服务接口:
public interface DataflowRunner {
String run(Dataflow dataflow) throws Exception;
}
以及具体实现:
public class DataflowRunnerServerImpl implements DataflowRunner {
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues="DATAFLOW_QUEUE")
public String run(Dataflow dataflow) throws Exception {
// SNIP
}
对于笑容和傻笑,我还尝试使用以下注释配置服务器实现类,但它具有相同的错误:
@RabbitHandler
@RabbitListener(
bindings = @QueueBinding(key = "dataflowRunner",
value = @Queue(value = "DATAFLOW_QUEUE", durable = "true", autoDelete = "false", exclusive = "false"),
exchange = @Exchange(value = "DATAFLOW_EXCHANGE", durable = "true", autoDelete = "false", type = "topic")) )
public String run(Dataflow dataflow) throws Exception {
客户端配置
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
connectionFactory.setUsername(rabbitUser);
connectionFactory.setPassword(rabbitPassword);
connectionFactory.setAddresses(rabbitAddresses);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jackson2MessageConverter());
return template;
}
是否有任何配置错误?我错过了什么?我在服务导出器和侦听器容器工厂上设置了转换器。
任何帮助和/或想法表示赞赏。