0

我尝试使用 spring-data-r2dbc 进行交易,但它不起作用。当两个查询工作时没关系,当第一个查询失败时,第二个查询不会执行。但是当第二个查询失败时,第一个不会回滚。

我在 1.0.0.RELEASE 中使用了 spring-data-r2dbc,在 0.8.1.RELEASE 中使用了 r2dbc-postgresql,在 0.8.1.RELEASE 中使用了 r2dbc-pool

@Transactional("reactiveTransactionManager")
fun insertPatchAndData(type: String, patch: JsonPatch, data: MyData, userId: String): Mono<Void> {
    return databaseClient.execute("""
                        insert data_patch(
                            id,
                            serial,
                            member_id,
                            system_id,
                            user_id,
                            type_patch,
                            version,
                            data_patch,
                            created_at,
                            modified_at
                        ) values (
                            :id,
                            nextval('data_patch_serial_seq'),
                            :memberId,
                            :systemId,
                            :userId,
                            :type,
                            :version,
                            :patch,
                            :createdAt,
                            :modifiedAt
                        )
                        """.trimIndent())
            .bind("id", data.id)
            .bind("memberId", data.memberId.toString())
            .bind("systemId", "default")
            .bind("userId", userId)
            .bind("type", type)
            .bind("version", 1)
            .bind("patch", Json.of(objectMapper.writeValueAsString(patch)))
            .bind("createdAt", data.createdAt)
            .bind("modifiedAt", OffsetDateTime.now())
            .fetch()
            .then(databaseClient.execute("""
                        insert into
                            data(id, member_id, json_data)
                        values
                            (:id, :memberId, :data)
                        on conflict (id) do
                        update set
                            json_data = :data
                    """.trimMargin()
                    ).bind("id", data.id)
                    .bind("memberId", data.memberId.toString())
                    .bind("data", Json.of(objectMapper.writeValueAsString(data)))
                    .fetch().rowsUpdated()).then()
}

@Configuration
@EnableTransactionManagement
class R2dbcConfiguration(private val r2dbcProperties: R2dbcProperties) {

    private lateinit var pool: ConnectionPool

    @Bean
    fun connectionFactory(): ConnectionFactory {
        val connectionFactoryOptions = ConnectionFactoryOptions.builder()
            .from(ConnectionFactoryOptions.parse(r2dbcProperties.url))
            .option(ConnectionFactoryOptions.USER, r2dbcProperties.username)
            .option(ConnectionFactoryOptions.PASSWORD, r2dbcProperties.password)
            .build()
        return ConnectionFactories.get(connectionFactoryOptions)
    }

    @Bean
    fun pool(connectionFactory: ConnectionFactory): ConnectionPool {
        val connectionPoolConfiguration = ConnectionPoolConfiguration.builder(connectionFactory)
            .initialSize(r2dbcProperties.pool.initialSize)
            .maxSize(r2dbcProperties.pool.maxSize)
            .build()
        return ConnectionPool(connectionPoolConfiguration)
    }

    @Bean
    fun reactiveTransactionManager(connectionFactory: ConnectionFactory): ReactiveTransactionManager {
        return R2dbcTransactionManager(connectionFactory)
    }

    @Bean
    fun client(pool: ConnectionPool): DatabaseClient {
        return DatabaseClient.create(pool)
    }

    @PreDestroy
    fun destroy() {
        pool.let { if (!it.isDisposed) it.dispose() }
    }
}

在调试时,我可以看到使用的 PooledConnection 的 inTransaction 属性为 false

4

0 回答 0