2

我正在尝试将 Transactionnal RabbitMQ 通道与 Spring-AMQP 一起使用,但我想真正吞下异常来记录它们并能够恢复它们。

使用 channelTransacted=true 会强制 Channel 也加入当前 transactionManager(在我的情况下为 Hibernate),这会导致提交异常被重新抛出 @Transactionnal 边界,导致上层失败而无法捕获并记录它。

我还尝试手动将发布附加到事务,以便仅在提交成功后执行:

public void publishFailSafeAfterSuccessfulTransaction(final String routingKey, final String message) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            try {
                rabbitTemplate.convertAndSend(routingKey, message);
            } catch (Exception exception) {
                logger.error("Error while publishing message to RabbitMQ ");
            }
        }
});

以这种方式使用:

Entity entity = save(entity);
publishFailSafeAfterSuccessfulTransaction("routingkey", "Entity was updated");

但在那种情况下,我不能使用 channelTransacted=true 因为它会将 registeringSynchronization 嵌套在另一个 registeringSynchronization 中并且根本无法调用......

有没有办法做到这一点?

更新:理想情况下,我想覆盖 ConnectionFactoryUtils 类中使用的 RabbitResourceSynchronization,但它是一个没有工厂实例化的私有类

TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder, connectionFactory, synched));
4

1 回答 1

1

我实施的解决方案是在主事务提交后在新事务中进行发布。

第一个电话:

Entity entity = save(entity);
publishFailSafeAfterSuccessfulTransaction("routingkey", "Entity was updated");

此方法注册以在主事务提交后进行发布。

public void publishFailSafeAfterSuccessfulTransaction(final String routingKey, final String event) {

    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            try {
                publishFailSafe(routingKey, event);
            } catch (Exception exception) {
                //Do some recovering
            }
        }
    });
}

在主事务提交后,这个将进行发布。当通道进行事务处理时,它将在提交该新事务时提交消息,并且仅使该事务失败,并且将在先前的方法中捕获错误。

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void publishFailSafe(String routingKey, String event) {
    try {
        rabbitTemplate.convertAndSend(routingKey.getRoutingKey(), event);
    } catch (Exception exception) {
        //Do some recovering
    }
}
于 2015-03-23T10:45:08.440 回答