1

Java 11 引入了一个新包java.net.http,用于发出 HTTP 请求。对于一般用途,它非常简单。

java.net.http我的问题是:当客户端收到每个块时,我如何使用它来处理块响应?

java.http.net包含一个BodySubscriber似乎是我想要的反应式,但我找不到如何使用它的示例。

http_get_demo.py

下面是一个 python 实现,当它们到达时打印块,我想用 java.net.http 做同样的事情:

import argparse
import requests


def main(url: str):
    with requests.get(url, stream=True) as r:
        for c in r.iter_content(chunk_size=1):
            print(c.decode("UTF-8"), end="")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Read from a URL and print as text as chunks arrive")
    parser.add_argument('url', type=str, help="A URL to read from")
    args = parser.parse_args()

    main(args.url)

HttpGetDemo.java

为了完整起见,这里有一个使用 java.net.http 发出阻塞请求的简单示例:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;

public class HttpGetDemo {

  public static void main(String[] args) throws Exception {

    var request = HttpRequest.newBuilder()
            .uri(URI.create(args[0]))
            .build();

    var bodyHandler = HttpResponse.BodyHandlers
            .ofString();

    var client = HttpClient.newHttpClient();
    var response = client.send(request, bodyHandler);
    System.out.println(response.body());

  }
}

HttpAsyncGetDemo.java

这是发出非阻塞/异步请求的示例:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;

/**
 * ReadChunked
 */
public class HttpAsyncGetDemo {

  public static void main(String[] args) throws Exception {

    var request = HttpRequest.newBuilder()
            .uri(URI.create(args[0]))
            .build();

    var bodyHandler = HttpResponse.BodyHandlers
            .ofString();

    var client = HttpClient.newHttpClient();

    client.sendAsync(request, bodyHandler)
            .thenApply(HttpResponse::body)
            .thenAccept(System.out::println)
            .join();

  }
}
4

4 回答 4

2

您可以在 s 出现时打印ByteBuffer它们,但不能保证 aByteBuffer对应于一个块。块由堆栈处理。将为每个块推送一个ByteBuffer切片 - 但如果缓冲区中没有足够的空间剩余,则将推送部分块。消费者看到的只是一个ByteBuffer包含数据的 s 流。所以你可以做的是ByteBuffer在它们到来时打印它们,但你不能保证它们与服务器发送的每个块完全对应。

注意:如果您的请求正文是基于文本的,那么您可以使用 BodyHandlers.fromLineSubscriber(Subscriber<? super String> subscriber)自定义Subscriber<String>来打印每一行。使用BodyHandlers.fromLineSubscriber响应标头中指示的字符集将字节解码为字符,如果需要,缓冲字节直到它们可以被解码(如果文本包含在多个字节上编码的字符,则 ByteBuffer 可能在编码序列的中间结束) , 并在行边界处拆分它们。对于文本中的每一行,将调用一次 Subscriber::onNext 方法。请参阅https://download.java.net/java/early_access/jdk11/docs/api/java.net.http/java/net/http/HttpResponse.BodyHandlers.html#fromLineSubscriber(java.util.concurrent.Flow.Subscriber )了解更多信息。

于 2018-10-12T17:03:34.303 回答
2

python 代码不能确保响应主体数据一次可用一个HTTP 块。它只是向应用程序提供少量数据,从而减少应用程序级别消耗的内存量(它可以在堆栈中缓冲较低)。Java 11 HTTP 客户端支持通过流式主体处理程序之一进行流式传输,HttpResponse.BodyHandlers: ofInputStream, ofByteArrayConsumer,asLines等。

或者编写您自己的处理程序/订阅者,如下所示: https ://www.youtube.com/watch?v=qiaC0QMLz5Y

于 2018-10-12T19:37:53.180 回答
1

感谢@pavel 和@chegar999 的部分回答。他们引导我找到我的解决方案。

概述

我想出的解决方案如下。基本上,解决方案是使用自定义java.net.http.HttpResponse.BodySubscriber. BodySubscriber 包含响应式方法(onSubscribe、onNext、onError 和 onComplete)和 getBody 方法,该方法基本上返回 java CompletableFuture,最终将生成 HTTP 请求的正文。拥有 BodySubscriber 后,您可以像这样使用它:

HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
    .uri(URI.create(uri))
    .build();

return client.sendAsync(request, responseInfo -> new StringSubscriber())
    .whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
    .thenApply(HttpResponse::body);

注意这一行:

client.sendAsync(request, responseInfo -> new StringSubscriber())

这就是我们注册自定义 BodySubscriber 的地方;在这种情况下,我的自定义类名为StringSubscriber.

CustomSubscriber.java

这是一个完整的工作示例。使用 Java 11,您无需编译即可运行它。只需将其粘贴到一个名为 的文件CustomSubscriber.java中,然后运行该命令java CustomSubscriber <some url>。它在每个块到达时打印它的内容。它还收集它们并在响应完成时将它们作为正文返回。

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import java.util.List;

public class CustomSubscriber {

  public static void main(String[] args) {
    CustomSubscriber cs = new CustomSubscriber();
    String body = cs.get(args[0]).join();
    System.out.println("--- Response body:\n: ..." + body + "...");
  }

  public CompletableFuture<String> get(String uri) {
    HttpClient client = HttpClient.newHttpClient();
    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(uri))
        .build();

    return client.sendAsync(request, responseInfo -> new StringSubscriber())
        .whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
        .thenApply(HttpResponse::body);
  }

  static class StringSubscriber implements BodySubscriber<String> {

    final CompletableFuture<String> bodyCF = new CompletableFuture<>();
    Flow.Subscription subscription;
    List<ByteBuffer> responseData = new CopyOnWriteArrayList<>();

    @Override
    public CompletionStage<String> getBody() {
      return bodyCF;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
      this.subscription = subscription;
      subscription.request(1); // Request first item
    }

    @Override
    public void onNext(List<ByteBuffer> buffers) {
      System.out.println("-- onNext " + buffers);
      try {
        System.out.println("\tBuffer Content:\n" + asString(buffers));
      } 
      catch (Exception e) {
        System.out.println("\tUnable to print buffer content");
      }
      buffers.forEach(ByteBuffer::rewind); // Rewind after reading
      responseData.addAll(buffers);
      subscription.request(1); // Request next item
    }

    @Override
    public void onError(Throwable throwable) {
      bodyCF.completeExceptionally(throwable);
    }

    @Override
    public void onComplete() {
      bodyCF.complete(asString(responseData));
    }

    private String asString(List<ByteBuffer> buffers) {
      return new String(toBytes(buffers), StandardCharsets.UTF_8);
    }

    private byte[] toBytes(List<ByteBuffer> buffers) {
      int size = buffers.stream()
          .mapToInt(ByteBuffer::remaining)
          .sum();
      byte[] bs = new byte[size];
      int offset = 0;
      for (ByteBuffer buffer : buffers) {
        int remaining = buffer.remaining();
        buffer.get(bs, offset, remaining);
        offset += remaining;
      }
      return bs;
    }

  }
}

试一试

要测试此解决方案,您需要一个服务器来发送响应,该响应使用Transfer-encoding: chunked并以足够慢的速度发送以观察块的到达。我在https://github.com/hohonuuli/demo-chunk-server创建了一个,但您可以使用 Docker 来启动它,如下所示:

docker run -p 8080:8080 hohonuuli/demo-chunk-server

然后使用运行 CustomSubscriber.java 代码java CustomSubscriber.java http://localhost:8080/chunk/10

于 2018-10-30T05:25:53.603 回答
0

现在有一个新的 Java 库来解决 RxSON 的这种需求:https ://github.com/rxson/rxson 它利用 JsonPath 和 RxJava 在响应到达后立即从响应中读取 JSON 流块,并将它们解析为java 对象。

例子:

String serviceURL = "https://think.cs.vt.edu/corgis/datasets/json/airlines/airlines.json";
   HttpRequest req = HttpRequest.newBuilder(URI.create(serviceURL)).GET().build();
   RxSON rxson = new RxSON.Builder().build();

   String jsonPath = "$[*].Airport.Name";
   Flowable<String> airportStream = rxson.create(String.class, req, jsonPath);
   airportStream
       .doOnNext(it -> System.out.println("Received new item: " + it))
       //Just for test
       .toList()
       .blockingGet();
于 2020-10-23T20:14:53.073 回答