1

我正在尝试使用 spring-data-r2dbc 存储库结合 TransactionalDatabaseClient 来实现事务:

class SongService(
    private val songRepo: SongRepo,
    private val databaseClient: DatabaseClient

){
    private val tdbc = databaseClient as TransactionalDatabaseClient

    ...
    ...
    fun save(song: Song){
        return tdbc.inTransaction{ 
            songRepo
                .save(mapRow(song, albumId)) //Mapping to a row representation
                .delayUntil { savedSong -> tdbc.execute.sql(...).fetch.rowsUpdated() } //saving a many to many relation
                .map(::mapSong) //Mapping back to actual song and retrieve the relationship data.
        }
    }

}

我目前有@Configuration一个@EnableR2dbcRepositoriesAbstractR2dbcConfiguration. 在这里,我重写了databaseClient返回一个TransactionalDatabaseClient. 这应该与 SongService 类中的实例相同。

在仅订阅和打印的测试中运行代码时,我得到org.springframework.transaction.NoTransactionException: ReactiveTransactionSynchronization not active并且不返回关系数据。

但是,当使用项目 Reactors stepverifier 时,我得到java.lang.IllegalStateException: Connection is closed. 同样在这种情况下,不返回关系数据。

只是为了记录,我已经看到了https://github.com/spring-projects/spring-data-r2dbc/issues/44

4

1 回答 1

0

这是一个有效的 Java 示例:

@Autowired TransactionalDatabaseClient txClient;
@Autowired Mono<Connection> connection;
//You Can also use: @Autowired Mono<? extends Publisher> connectionPublisher;

public Flux<Void> example {
txClient.enableTransactionSynchronization(connection);
// Or, txClient.enableTransactionSynchronization(connectionPublisher);

Flux<AuditConfigByClub> audits = txClient.inTransaction(tx -> {
  txClient.beginTransaction();
  return tx.execute().sql("SELECT * FROM audit.items")
  .as(Item.class)
  .fetch()
  .all();
}).doOnTerminate(() -> {
  txClient.commitTransaction();
});
txClient.commitTransaction();

audits.subscribe(item -> System.out.println("anItem: " + item));
  return Flux.empty()
}

我刚开始反应,所以不太确定我在用我的回调做什么哈哈。但是我决定继续使用TransactionalDatabaseClientDatabaseClient或者Connection因为我将在 R2dbc 处于当前状态时使用我可以获得的所有实用程序。

在您的代码中,您实际上实例化了一个 Connection 对象吗?如果是这样,我认为您会在配置中完成它。它可以像 DatabaseClient 一样在整个应用程序中使用,但它稍微复杂一些。

如果不:

@Bean
@Override // I also used abstract config
public ConnectionFactory connectionFactory() {
  ...
}

@Bean 
TransactionalDatabaseClient txClient() {
  ...
}

//TransactionalDatabaseClient will take either of these as arg in 
//#enableTransactionSynchronization method

@Bean
public Publisher<? extends Connection> connectionPublisher() {
  return connectionFactory().create();
}

@Bean
public Mono<Connection> connection() {
  return = Mono.from(connectionFactory().create());
}

如果您在转换到 Kotlin 时遇到问题,可以使用另一种方法来启用同步:

// From what I understand, this is a useful way to move between 
// transactions within a single subscription
TransactionResources resources = TransactionResources.create();
resources.registerResource(Resource.class, resource);

ConnectionFactoryUtils
  .currentReactiveTransactionSynchronization()
  .subscribe(currentTx -> sync.registerTransaction(Tx));

希望这对 Kotlin 来说翻译得很好。

于 2019-03-28T06:23:02.987 回答