我正在尝试遵循Micronaut Kafka 指南。它显示了这段代码:
Single<Book> sendBook(
@KafkaKey String author,
Single<Book> book
);
我尝试以这种方式实施但没有成功
制片人
package com.tolearn.producer
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Header
import io.reactivex.Single
@KafkaClient(
id = "demo-producer",
acks = KafkaClient.Acknowledge.ALL)
@Header(name = "X-Token", value = "\${my.application.token}")
public interface DemoProducer {
//Reactive and Non-Blocking
@Topic("demotopic")
fun sendDemoMsgReactive(
@KafkaKey key: String?,
msg: Single<String>): Single<String?>?
}
并从服务层调用它
package com.tolearn.service
import com.tolearn.producer.DemoProducer
import io.reactivex.Single
import io.reactivex.SingleOnSubscribe
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
@Singleton
class DemoService {
@Inject
@Named("dp")
lateinit var dp : DemoProducer
fun postDemo(key: String, msg: String){
//Reactive and No-blocking
val singleReturned:Single<String> = dp.sendDemoMsgReactive(key, SingleOnSubscribe<String> msg ).subscribe()
singleReturned.doOnSuccess{
print("ok")
}
singleReturned.doOnError ((e)->print(e))
}
}
基本上,我想要的是使用 io.reactivex.Single 向 kafka“无阻塞”样式发布消息。我知道我必须订阅然后编写两个回调:onSuccess 和 onError。当然,我缺少一些关于 ReactiveX 的基本概念。请注意,任何线索将不胜感激。