我有以下场景,我的程序使用阻塞队列异步处理消息。有多个 RSocket 客户端希望接收此消息。我的设计是这样一种方式,当消息到达阻塞队列时,绑定到 Flux 的流将发出。我已尝试按如下方式实现此要求,但客户端未收到任何响应。但是,我可以看到 Stream 供应商被正确触发。
有人可以帮忙吗?
@MessageMapping("addListenerHook")
public Flux<QueryResult> addListenerHook(String clientName){
System.out.println("Adding Listener:"+clientName);
BlockingQueue<QueryResult> listenerQ = new LinkedBlockingQueue<>();
Datalistener.register(clientName,listenerQ);
return Flux.fromStream(
()-> Stream.generate(()->streamValue(listenerQ))).map(q->{
System.out.println("I got an event : "+q.getResult());
return q;
});
}
private QueryResult streamValue(BlockingQueue<QueryResult> inStream){
try{
return inStream.take();
}catch(Exception e){
return null;
}
}