8

我正在尝试在我的 Spring Boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例: https ://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main /java/reactor/kafka/samples/SampleScenarios.java

看起来反应式 kafka 中还没有对 Spring 的支持

我了解卡夫卡侦听器如何在 Spring 中的非反应式卡夫卡 API 中工作:最简单的解决方案是为 ConcurrentKafkaListenerContainerFactory 和 ConsumerFactory 配置 bean,然后使用 @KafkaListener 注释和瞧

但我现在不确定如何在 Spring 中正确使用响应式 kafka。

基本上我需要一个主题的听众。我应该创建自己的某种循环或调度程序吗?或者,也许我错过了一些东西。任何人都可以分享他们的知识和最佳实践吗?

4

1 回答 1

0

我还没有现成的解决方案,但我正在尝试这个(Kotlin 代码,Spring Boot)。有人在这里发布了部分代码片段https://github.com/reactor/reactor-kafka/issues/100

@EventListener(ApplicationStartedEvent::class)
fun onSomeEvent() {
    kafkaReceiver
        .receive()
        .doOnNext { record ->
            val myEvent = record.value()
            processMyEvent(myEvent).thenEmpty {
                record.receiverOffset().acknowledge()
            }
        }
        .doOnError {
            /* todo */
        }
        .subscribe()
}

查看其他堆栈溢出问题。那里没有太多,但也许会给你一些想法

于 2020-12-31T08:09:08.767 回答