4

我有以下场景,我的程序使用阻塞队列异步处理消息。有多个 RSocket 客户端希望接收此消息。我的设计是这样一种方式,当消息到达阻塞队列时,绑定到 Flux 的流将发出。我已尝试按如下方式实现此要求,但客户端未收到任何响应。但是,我可以看到 Stream 供应商被正确触发。

有人可以帮忙吗?

  @MessageMapping("addListenerHook")
public Flux<QueryResult> addListenerHook(String clientName){
    System.out.println("Adding Listener:"+clientName);
    BlockingQueue<QueryResult> listenerQ = new LinkedBlockingQueue<>();
    Datalistener.register(clientName,listenerQ);

    return Flux.fromStream(
            ()-> Stream.generate(()->streamValue(listenerQ))).map(q->{
        System.out.println("I got an event : "+q.getResult());
        return q;
    });
}


private QueryResult streamValue(BlockingQueue<QueryResult> inStream){
    try{
        return inStream.take();
    }catch(Exception e){
        return null;
    }
}
4

1 回答 1

1

由于阻塞 API,这很难简单而干净地解决。我认为这就是为什么这里没有简单的桥接 API 来帮助您实现这一点的原因。您应该想出一个干净的解决方案,首先将 BlockingQueue 变成 Flux。然后 spring-boot 部分变为非事件。

这就是为什么正确的解决方案可能涉及自定义 BlockingQueue 实现,例如https://www.nurkiewicz.com/2015/07/sumption-javautilconcurrentblockingque.html中的 ObservableQueue

另一种方法是如何从阻塞队列中创建反应器通量?

如果您需要保留 LinkedBlockingQueue,则开始的解决方案可能类似于以下内容。

  val f = flux<String> {
    val listenerQ = LinkedBlockingQueue<QueryResult>()
    Datalistener.register(clientName,listenerQ);
    
    while (true) {
      send(bq.take())
    }
  }.subscribeOn(Schedulers.elastic())

使用 Flux 这样的 API,您绝对应该在订阅之前避免任何副作用,因此在方法体内部之前不要注册您的侦听器。但是您将需要改进此示例以处理取消,或者您取消侦听器并中断执行拍摄的线程。

于 2020-12-16T09:30:27.330 回答