0

我的 Spring Boot (WebFlux/R2DBC/Reactor Kafka) 应用程序中有以下消费者

    @EventListener(ApplicationStartedEvent::class)
    fun onMyEvent() {
        kafkaReceiver
            .receive()
            .doOnNext { record ->
                val myEvent = record.value()
                myService.deleteSomethingFromDbById(myEvent.myId)
                .thenEmpty {
                    record.receiverOffset().acknowledge()
                }.subscribe()
            }
            .subscribe()
    }

我想为 Kafka 和 DB 事务添加事务同步。阅读文档和一些 stakoverflow 问题后

似乎ChainedKafkaTransactionManager是要走的路。

但是下面的代码不起作用,因为 ChainedKafkaTransactionManager 需要 type 的事务管理器PlatformTransactionManager。所以参数r2dbcTransactionManager不被接受。

    @Bean(name = ["chainedTransactionManager"])
    fun chainedTransactionManager(
        r2dbcTransactionManager: R2dbcTransactionManager,
        kafkaTransactionManager: KafkaTransactionManager<*, *>
    ) = ChainedKafkaTransactionManager(kafkaTransactionManager, r2dbcTransactionManager)

还有另一种方法可以实现这一目标吗?

4

1 回答 1

0

为 Kafka 消费者链接事务是没有意义的。仅适用于发布者,即传出消息。

但是您应该确保不要多次处理同一消息。

@EventListener(ApplicationStartedEvent::class)
fun onMyEvent() {
  kafkaReceiver.receive()
    // Make sure to have unique index on (topic, partition, offset)
    // so you receive a ConstraintViolationException
    .flatMap { r ->
      val msg = ConsumedMessage(r.topic(), r.partition(), r.offset())
      consumedMessagesRepository.save(msg).thenReturn(r)
    }
    .onErrorContinue {ex, r -> log.warn("Duplicate msg") }
    .flatMap { r ->
      myService.deleteSomethingFromDbById(r.value().myId)
        .thenReturn(r)
    }
    .flatMap { r ->
      r.receiverOffset().commit()
    }
    .subscribe()
}
于 2021-07-07T21:15:37.120 回答