如何发送 buf 然后接收 msg
方法
Mono<ByteBuf> send(ByteBuf buf){
// how to send the buf then receive a msg
}
我试图通过从连接出站发送一条消息并从入站接收一条消息然后返回一条消息单声道来实现此方法。但我只能在 then(Publisher) 方法中接收消息。似乎无法返回数据 Mono
我试过这个。
// the connecttion has been initialized before entering this method.
Mono.just(buf)
.doOnNext(data -> connection.outbound().sendObject(data).then().subscribe())
.then(connection
.inbound()
.receiveObject()
.single()
.map(RpcDataPackage.class::cast)
.map(RpcDataPackage::getData)
.map(data -> {
try {
return resCodec.decode(data);
} catch (IOException e) {
throw new RpcRequestException(e);
}
})
);
但它会阻塞直到连接超时
而且我尝试了另一个代码。我添加了一个handle
方法并将响应放入地图。然后我可以Mono.fromSupply()
在map.get(key) != null
.
它会阻塞线程。
.handle(((nettyInbound, nettyOutbound) -> nettyInbound
.receiveObject()
.map(RpcDataPackage.class::cast)
.doOnNext(pkg -> {
String responseKey = "a key"
responseMap.put(responseKey, pkg);
})
.then()))