Spring AMQP 同步事务回滚不起作用。这里源中的数据库事务不由 Spring 处理。我需要在一个事务中接收和发送 Spring AMQP 消息。以下是相关代码的快照。如果您需要其他任何东西,请告诉我。
/////Connection Factory initialization
@Bean
public ConnectionFactory getConnectionFactory() {
System.out.println("hello");
configManager();
String ip = ConfigManager.getQueueServerHost();
System.out.println("IP Address : "+ip);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(ip);
connectionFactory.setUsername(ConfigManager.getQueueUserName());
connectionFactory.setPassword(ConfigManager.getQueuePassword());
connectionFactory.setPort(ConfigManager.getQueueServerPort());
//connectionFactory.setPublisherReturns(true);
//connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
/////Rabbit Template initialization
@Bean
public RabbitTemplate producerRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(getConnectionFactory());
rabbitTemplate.setRoutingKey(ConfigManager.getProducerQueueName());
rabbitTemplate.setQueue(ConfigManager.getProducerQueueName());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
/////Transactional Code
@Transactional(readOnly=false, rollbackFor=Exception.class)
public void processFile(RabbitTemplate rabbitTemplate)throws Exception{
rabbitTemplate.setRoutingKey(ConfigManager.getConsumerQueueName());
rabbitTemplate.setQueue(ConfigManager.getConsumerQueueName());
Object messageObj = rabbitTemplate.receiveAndConvert();
Message message = null;
try{
if(messageObj != null){
if (messageObj instanceof Message){
message = (Message)messageObj;
System.out.println("Message received is '" + message.getFileName() + "' for Hospital "+message.getHospitalId());
String newFileName = this.process(message.getFileName(), message.getHospitalId());
this.sendMessage(newFileName, message.getHospitalId());
}else{
System.out.println("Unknown message received '" + messageObj + "'");
}
}
}catch(Exception e){
e.printStackTrace();
throw e;
}
}