0

Spring AMQP 同步事务回滚不起作用。这里源中的数据库事务不由 Spring 处理。我需要在一个事务中接收和发送 Spring AMQP 消息。以下是相关代码的快照。如果您需要其他任何东西,请告诉我。

/////Connection Factory initialization

@Bean
public ConnectionFactory getConnectionFactory() {
    System.out.println("hello");
    configManager();
    String ip = ConfigManager.getQueueServerHost();
    System.out.println("IP Address : "+ip);
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(ip);
    connectionFactory.setUsername(ConfigManager.getQueueUserName());
    connectionFactory.setPassword(ConfigManager.getQueuePassword());
    connectionFactory.setPort(ConfigManager.getQueueServerPort());
    //connectionFactory.setPublisherReturns(true);
    //connectionFactory.setPublisherConfirms(true);
    return connectionFactory;

}

/////Rabbit Template initialization

@Bean
public RabbitTemplate producerRabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(getConnectionFactory());
    rabbitTemplate.setRoutingKey(ConfigManager.getProducerQueueName());
    rabbitTemplate.setQueue(ConfigManager.getProducerQueueName());
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setChannelTransacted(true);
    return rabbitTemplate;
}


/////Transactional Code

@Transactional(readOnly=false, rollbackFor=Exception.class)
public void processFile(RabbitTemplate rabbitTemplate)throws Exception{
    rabbitTemplate.setRoutingKey(ConfigManager.getConsumerQueueName());
    rabbitTemplate.setQueue(ConfigManager.getConsumerQueueName());

    Object messageObj = rabbitTemplate.receiveAndConvert();
    Message message = null;
    try{
        if(messageObj != null){
            if (messageObj instanceof Message){
                message = (Message)messageObj;
                System.out.println("Message received is '" + message.getFileName() + "' for Hospital "+message.getHospitalId());
                String newFileName = this.process(message.getFileName(), message.getHospitalId());
                this.sendMessage(newFileName, message.getHospitalId());
            }else{
                System.out.println("Unknown message received '" +  messageObj + "'");           
            }

        }
    }catch(Exception e){
        e.printStackTrace();
        throw e;
    }
}
4

1 回答 1

0

这对我来说可以; 我刚刚上传了一个带有我的测试用例的要点,表明它可以工作。

我建议您打开TRACE级别日志记录以查看所有事务活动(并将其与我放在 Gist 中的日志进行比较)。

于 2013-09-17T14:01:47.870 回答