我的 Spring Boot (WebFlux/R2DBC/Reactor Kafka) 应用程序中有以下消费者
@EventListener(ApplicationStartedEvent::class)
fun onMyEvent() {
kafkaReceiver
.receive()
.doOnNext { record ->
val myEvent = record.value()
myService.deleteSomethingFromDbById(myEvent.myId)
.thenEmpty {
record.receiverOffset().acknowledge()
}.subscribe()
}
.subscribe()
}
我想为 Kafka 和 DB 事务添加事务同步。阅读文档和一些 stakoverflow 问题后
- Spring Boot 中的事务同步与 Database+ kafka 示例
- Spring Kafka ChainedKafkaTransactionManager 不与 JPA Spring-data 事务同步
- Spring Kafka 中的事务同步
- Spring @Transactional 与跨多个数据源的事务
似乎ChainedKafkaTransactionManager
是要走的路。
但是下面的代码不起作用,因为 ChainedKafkaTransactionManager 需要 type 的事务管理器PlatformTransactionManager
。所以参数r2dbcTransactionManager
不被接受。
@Bean(name = ["chainedTransactionManager"])
fun chainedTransactionManager(
r2dbcTransactionManager: R2dbcTransactionManager,
kafkaTransactionManager: KafkaTransactionManager<*, *>
) = ChainedKafkaTransactionManager(kafkaTransactionManager, r2dbcTransactionManager)
还有另一种方法可以实现这一目标吗?