我尝试使用 spring-data-r2dbc 进行交易,但它不起作用。当两个查询工作时没关系,当第一个查询失败时,第二个查询不会执行。但是当第二个查询失败时,第一个不会回滚。
我在 1.0.0.RELEASE 中使用了 spring-data-r2dbc,在 0.8.1.RELEASE 中使用了 r2dbc-postgresql,在 0.8.1.RELEASE 中使用了 r2dbc-pool
@Transactional("reactiveTransactionManager")
fun insertPatchAndData(type: String, patch: JsonPatch, data: MyData, userId: String): Mono<Void> {
return databaseClient.execute("""
insert data_patch(
id,
serial,
member_id,
system_id,
user_id,
type_patch,
version,
data_patch,
created_at,
modified_at
) values (
:id,
nextval('data_patch_serial_seq'),
:memberId,
:systemId,
:userId,
:type,
:version,
:patch,
:createdAt,
:modifiedAt
)
""".trimIndent())
.bind("id", data.id)
.bind("memberId", data.memberId.toString())
.bind("systemId", "default")
.bind("userId", userId)
.bind("type", type)
.bind("version", 1)
.bind("patch", Json.of(objectMapper.writeValueAsString(patch)))
.bind("createdAt", data.createdAt)
.bind("modifiedAt", OffsetDateTime.now())
.fetch()
.then(databaseClient.execute("""
insert into
data(id, member_id, json_data)
values
(:id, :memberId, :data)
on conflict (id) do
update set
json_data = :data
""".trimMargin()
).bind("id", data.id)
.bind("memberId", data.memberId.toString())
.bind("data", Json.of(objectMapper.writeValueAsString(data)))
.fetch().rowsUpdated()).then()
}
@Configuration
@EnableTransactionManagement
class R2dbcConfiguration(private val r2dbcProperties: R2dbcProperties) {
private lateinit var pool: ConnectionPool
@Bean
fun connectionFactory(): ConnectionFactory {
val connectionFactoryOptions = ConnectionFactoryOptions.builder()
.from(ConnectionFactoryOptions.parse(r2dbcProperties.url))
.option(ConnectionFactoryOptions.USER, r2dbcProperties.username)
.option(ConnectionFactoryOptions.PASSWORD, r2dbcProperties.password)
.build()
return ConnectionFactories.get(connectionFactoryOptions)
}
@Bean
fun pool(connectionFactory: ConnectionFactory): ConnectionPool {
val connectionPoolConfiguration = ConnectionPoolConfiguration.builder(connectionFactory)
.initialSize(r2dbcProperties.pool.initialSize)
.maxSize(r2dbcProperties.pool.maxSize)
.build()
return ConnectionPool(connectionPoolConfiguration)
}
@Bean
fun reactiveTransactionManager(connectionFactory: ConnectionFactory): ReactiveTransactionManager {
return R2dbcTransactionManager(connectionFactory)
}
@Bean
fun client(pool: ConnectionPool): DatabaseClient {
return DatabaseClient.create(pool)
}
@PreDestroy
fun destroy() {
pool.let { if (!it.isDisposed) it.dispose() }
}
}
在调试时,我可以看到使用的 PooledConnection 的 inTransaction 属性为 false