这是我的用例,用户使用 websocket(带有订阅的 GraphQl)订阅我的流,我需要通过用户 ID 返回一个org.reactivestreams.Publisher
(应该是我的 kafka 主题订阅)过滤消息的实例。
为了说明,像这样:
/ **
* I don´t know how to get a instance of Publisher<Balance>
* It should be a consumer from a kafka topic
*/
fun balance(myStream: Publisher<Balance>, userId: String): Publisher<Balance> {
return myStream.filter { it.userId == userId }
}