0

我正在尝试为 Vertx Web 客户端编写一个包装器,以使用来自响应流的服务器加载响应正文Publisher

import org.reactivestreams.Publisher;
import io.vertx.reactivex.ext.web.client.WebClient;

interface Storage {
  Publisher<ByteBuffer> load(String key);
}

class WebStorage implements Storage {
  private final WebClient client;

  public WebStorage(final WebClient client) {
    this.client = client;
  }

  @Override
  public Publisher<ByteBuffer> laod(final String key) {
    return client.get(String.format("https://myhost/path?query=%s", key))
      .rxSend()
      .toFlowable()
      .map(resp -> ByteBuffer.wrap(resp.body().getBytes()));
  }
}

此解决方案不正确,因为它通过getBytes()调用以阻塞方式读取所有正文字节。

是否可以按块从 Vertx 读取响应WebClient并将其转换为Publisher(或 Rx Flowable)?

4

2 回答 2

2

Vert.x Web 客户端并非设计用于流式传输响应正文。它通过设计缓冲内容。

如果要流式传输内容,可以使用更灵活的底层 HTTP 客户端。

于 2020-03-26T11:12:41.357 回答
1

我想你可以使用ByteCodec.pipe

import io.reactivex.Flowable;
import io.vertx.ext.reactivestreams.ReactiveWriteStream;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.streams.WriteStream;
import io.vertx.reactivex.ext.web.client.WebClient;
import io.vertx.reactivex.ext.web.codec.BodyCodec;
import org.reactivestreams.Publisher;

import java.nio.ByteBuffer;

interface Storage {
    Publisher<ByteBuffer> load(String key);
}

class WebStorage implements Storage {
    private final Vertx vertx = Vertx.vertx();
    private final WebClient client;

    public WebStorage(final WebClient client) {
        this.client = client;
    }

    @Override
    public Publisher<ByteBuffer> load(final String key) {
        final ReactiveWriteStream<Buffer> stream = ReactiveWriteStream.writeStream(vertx.getDelegate());
        client.get(String.format("https://myhost/path?query=%s", key))
            .as(BodyCodec.pipe(WriteStream.newInstance(stream)))
            .rxSend().subscribe();
        return Flowable.fromPublisher(stream).map(buffer -> ByteBuffer.wrap(buffer.getBytes()));
    }
}
于 2020-03-26T11:29:23.553 回答