2

我有以下 RSocket 服务器

@Log4j2
public class NativeRsocketServerFnF {
  public static void main(String[] args) {
    RSocketFactory.receive()
        .frameDecoder(ZERO_COPY)
        .errorConsumer(log::error)
        .acceptor((setup, clientHandleRsocket) -> {
          return Mono.just(
              new AbstractRSocket() {
                @Override
                public Mono<Void> fireAndForget(Payload payload) {
                  CharSequence message = payload.data().readCharSequence(payload.data().readableBytes(), forName("UTF-8"));
                  payload.release();
                  log.info("> from client: {}", message);
                  return Mono.empty();
                }
              }
          );
        })
        .transport(TcpServerTransport.create(8000))
        .start()
        .block()
        .onClose()
        .block();
  }
}

和下面的 RSocket 客户端

@Log4j2
public class NativeRsocketClientFnF {
  public static void main(String[] args) {
    RSocketFactory.connect()
        .frameDecoder(ZERO_COPY)
        .errorConsumer(log::error)
        .transport(TcpClientTransport.create(8000))
        .start()
        .flatMap(rSocket -> rSocket.fireAndForget(DefaultPayload.create("ping")))
        .block();
  }
}

如您所见,我正在尝试将“ping”作为有效负载数据从客户端发送到服务器

当我第一次启动服务器并启动客户端时,我看到了> from client: ping

如果我再次重新启动客户端,我在服务器上看不到任何消息。断点甚至没有在服务器上命中

我的理解是 Fire and Forget 只是简单地发送数据 & 不费心等待 & 看看服务器是否成功处理数据,但在我的情况下,服务器本身没有在客户端的后续运行中接收数据(最好新客户)

有什么我想念的吗?

我正在使用&1.0.0-RC5的版本rsocket-corersocket-transport-netty

操作系统:Ubuntu 16.04

4

1 回答 1

3

您对 Fire and Forget 模式的理解是正确的。

io.rsocket.RSocketRequester#fireAndForget在将数据发送到网络之前,问题已成功完成。当您阻止它并退出您的客户端应用程序时,连接将关闭。

io.rsocket.RSocketRequester#handleFireAndForget

    return emptyUnicastMono()
        .doOnSubscribe(
            new OnceConsumer<Subscription>() {
              @Override
              public void acceptOnce(@Nonnull Subscription subscription) {
                ByteBuf requestFrame =
                    RequestFireAndForgetFrameFlyweight.encode(
                        allocator,
                        streamId,
                        false,
                        payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
                        payload.sliceData().retain());
                payload.release();

                sendProcessor.onNext(requestFrame);
              }
            });

sendProcessor是中间通量处理器。它被实际连接使用,并且向网络发送请求帧可能会延迟。

您可以在客户端调用.then(Mono.delay(Duration.ofMillis(100)))之前添加以进行测试。.block()我认为大多数真正的应用程序在调用 fireAndForget 后不会立即退出。

于 2019-10-23T11:06:09.460 回答