0

如何发送 buf 然后接收 msg

方法

Mono<ByteBuf> send(ByteBuf buf){
    // how to send the buf then receive a msg
}

我试图通过从连接出站发送一条消息并从入站接收一条消息然后返回一条消息单声道来实现此方法。但我只能在 then(Publisher) 方法中接收消息。似乎无法返回数据 Mono

我试过这个。

// the connecttion has been initialized before entering this method.

        Mono.just(buf)
                .doOnNext(data -> connection.outbound().sendObject(data).then().subscribe())
                .then(connection
                        .inbound()
                        .receiveObject()
                        .single()
                        .map(RpcDataPackage.class::cast)
                        .map(RpcDataPackage::getData)
                        .map(data -> {
                            try {
                                return resCodec.decode(data);
                            } catch (IOException e) {
                                throw new RpcRequestException(e);
                            }
                        })
                );

但它会阻塞直到连接超时

而且我尝试了另一个代码。我添加了一个handle方法并将响应放入地图。然后我可以Mono.fromSupply()map.get(key) != null.

它会阻塞线程。

                .handle(((nettyInbound, nettyOutbound) -> nettyInbound
                        .receiveObject()
                        .map(RpcDataPackage.class::cast)
                        .doOnNext(pkg -> {
                            String responseKey = "a key"

                            responseMap.put(responseKey, pkg);
                        })
                        .then()))
4

3 回答 3

0

您应该使用 NettyOutbound::then 的组合来监听写入完成和 Mono::then 在写入后读取您的 NettyInboud。

  Mono<String> resposeMono = TcpClient.create()
            .connect()
            .flatMap(connection -> connection.outbound().sendString(Mono.just("Hello!"))
                    .then()
                    .then(connection.inbound().receive().aggregate().asString())
                    .doOnTerminate(connection::dispose));

这将写“你好!” 到输出,从输入中读取所有字节作为字符串,然后处理连接。

于 2019-04-23T14:16:48.870 回答
0

我阅读了 Mono javadoc 并找到了 MonoSink。

Mono.create(monoSink -> {
  // some call
})

当入站收到对象响应时sink.success()

于 2019-04-16T10:40:27.277 回答
0

您没有指定您的期望。请参见下面的示例,它发送一些数据,然后接收服务器返回的内容。

    @Test
    public void test() {
        Connection connection =
                TcpClient.create()
                         .wiretap(true)
                         .host("example.com")
                         .port(80)
                         .connect()
                         .block();

        assertNotNull(connection);

        connection.outbound()
                  .sendObject(Unpooled.wrappedBuffer("test".getBytes()))
                  .then(connection.inbound()
                                  .receiveObject()
                                  .last()
                                  .doOnNext(System.out::println)
                                  .then())
                  .then()
                  .block();
    }
于 2019-04-15T13:27:43.947 回答