0

这是我的用例,用户使用 websocket(带有订阅的 GraphQl)订阅我的流,我需要通过用户 ID 返回一个org.reactivestreams.Publisher(应该是我的 kafka 主题订阅)过滤消息的实例。

为了说明,像这样:

/ **
  *  I don´t know how to get a instance of Publisher<Balance>
  *  It should be a consumer from a kafka topic
  */
fun balance(myStream: Publisher<Balance>, userId: String): Publisher<Balance> {
    return myStream.filter { it.userId == userId }
}
4

1 回答 1

0

也许您需要编写一个 Spring Cloud Stream 消费者,然后以编程方式将其发布到 WebSocket。类似的东西

public Consumer<Flux<Balance>> myStream() {

  //filter here and then publish to websocket.

}

是一个 WebSocket 接收器实现的示例,您可以将其用作指南,但这不是响应式的。

于 2020-05-04T18:25:01.943 回答