5

我似乎在RSocket上找不到任何资源/教程,除了在 GitHub 上阅读他们的代码,我不明白。

我的服务器上有一个文件的路径:String serverFilePath;

我希望能够从我的客户端下载它(最好使用RSocket 的 Aeron 实现)。有谁知道如何使用 RSocket 做到这一点?

提前致谢。

4

2 回答 2

3

我在 RSocket 上工作,并编写了大部分 java 版本,包括 Aeron 传输。

我不建议目前使用 Aeron 实现。有几种方法可以发送文件:

  1. 使用 requestChannel 将数据推送到远程服务器。
  2. 使用 requestChannel 或 requestStream 将字节流式传输到客户端。

下面是一个使用 requestStream 的例子:

  public class FileCopy {

  public static void main(String... args) throws Exception {

    // Create a socket that receives incoming connections
    RSocketFactory.receive()
        .acceptor(
            new SocketAcceptor() {
              @Override
              // Create a new socket acceptor
              public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
                return Mono.just(
                    new AbstractRSocket() {
                      @Override
                      public Flux<Payload> requestStream(Payload payload) {
                        // Get the path of the file to copy
                        String path = payload.getDataUtf8();
                        SeekableByteChannel _channel = null;

                        try {
                          _channel = Files.newByteChannel(Paths.get(path), StandardOpenOption.READ);
                        } catch (IOException e) {
                          return Flux.error(e);
                        }

                        ReferenceCountUtil.safeRelease(payload);

                        SeekableByteChannel channel = _channel;
                        // Use Flux.generate to create a publisher that returns file at 1024 bytes
                        // at a time
                        return Flux.generate(
                            sink -> {
                              try {
                                ByteBuffer buffer = ByteBuffer.allocate(1024);
                                int read = channel.read(buffer);
                                buffer.flip();
                                sink.next(DefaultPayload.create(buffer));

                                if (read == -1) {
                                  channel.close();
                                  sink.complete();
                                }
                              } catch (Throwable t) {
                                sink.error(t);
                              }
                            });
                      }
                    });
              }
            })
        .transport(TcpServerTransport.create(9090))
        .start()
        .subscribe();

    String path = args[0];
    String dest = args[1];

    // Connect to a server
    RSocket client =
        RSocketFactory.connect().transport(TcpClientTransport.create(9090)).start().block();

    File f = new File(dest);
    f.createNewFile();

    // Open a channel to a new file
    SeekableByteChannel channel =
        Files.newByteChannel(f.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);

    // Request a stream of bytes
    client
        .requestStream(DefaultPayload.create(path))
        .doOnNext(
            payload -> {
              try {
                // Write the bytes received to the new file
                ByteBuffer data = payload.getData();
                channel.write(data);

                // Release the payload
                ReferenceCountUtil.safeRelease(payload);
              } catch (Exception e) {
                  throw new RuntimeException(e);
              }
            })
        // Block until all the bytes are received
        .blockLast();

    // Close the file you're writing too
    channel.close();
  }
}
于 2018-05-18T09:31:07.663 回答
1

现在这里有一个可恢复的文件传输示例

https://github.com/rsocket/rsocket-java/commit/d47629147dd1a4d41c7c8d5af3d80838e01d3ba5

于 2019-05-16T06:18:07.567 回答