4

我正在尝试通过新的 Java 11 HttpClient 取消 http 请求。

这是我的测试代码:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;

public class App {

    public static void main(String... args) throws InterruptedException {
        HttpClient client = HttpClient.newBuilder().build();

        URI uri = URI.create("http://releases.ubuntu.com/18.04.2/ubuntu-18.04.2-desktop-amd64.iso");
        HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build();

        var bodyHandler = HttpResponse.BodyHandlers.ofByteArrayConsumer(b -> System.out.println("#"));
        var future = client.sendAsync(request, bodyHandler);
        Thread.sleep(1000);

        future.cancel(true);
        System.out.println("\r\n----------CANCEL!!!------------");
        System.out.println("\r\nisCancelled: " + future.isCancelled());
        Thread.sleep(250);
    }
}

我希望,该请求任务将在future.cancel(true);调用线路后立即取消。因此,控制台中的最后一个打印行应该是isCancelled: true

但是,当我运行此代码时,我会看到如下内容:

################################################# #################################################
-  -  -  -  - 取消!!! -  -  -  -  -  - 
####
已取消:真
################################################# ################################################# ################################################# #

这意味着,该请求任务在我取消后仍在运行……那么,这是取消请求的正确方法吗?

UPD

取消请求的正确方法是(正如丹尼尔建议的那样,+ UPD2:在cancel()方法调用时避免 NPE):

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.ResponseInfo;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Subscription;

public class App {

    private static class SubscriberWrapper implements BodySubscriber<Void> {
        private final CountDownLatch latch;
        private final BodySubscriber<Void> subscriber;
        private Subscription subscription;

        private SubscriberWrapper(BodySubscriber<Void> subscriber, CountDownLatch latch) {
            this.subscriber = subscriber;
            this.latch = latch;
        }

        @Override
        public CompletionStage<Void> getBody() {
            return subscriber.getBody();
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            subscriber.onSubscribe(subscription);
            this.subscription = subscription;
            latch.countDown();
        }

        @Override
        public void onNext(List<ByteBuffer> item) {
            subscriber.onNext(item);
        }

        @Override
        public void onError(Throwable throwable) {
            subscriber.onError(throwable);
        }

        @Override
        public void onComplete() {
            subscriber.onComplete();
        }

        public void cancel() {
            subscription.cancel();
            System.out.println("\r\n----------CANCEL!!!------------");
        }
    }

    private static class BodyHandlerWrapper implements BodyHandler<Void> {
        private final CountDownLatch latch = new CountDownLatch(1);
        private final BodyHandler<Void> handler;
        private SubscriberWrapper subscriberWrapper;

        private BodyHandlerWrapper(BodyHandler<Void> handler) {
            this.handler = handler;
        }

        @Override
        public BodySubscriber<Void> apply(ResponseInfo responseInfo) {
            subscriberWrapper = new SubscriberWrapper(handler.apply(responseInfo), latch);
            return subscriberWrapper;
        }

        public void cancel() {
            CompletableFuture.runAsync(() -> {
                try {
                    latch.await();
                    subscriberWrapper.cancel();
                } catch (InterruptedException e) {}
            });
        }
    }

    public static void main(String... args) throws InterruptedException, ExecutionException {
        HttpClient client = HttpClient.newBuilder().build();

        URI uri = URI.create("http://releases.ubuntu.com/18.04.2/ubuntu-18.04.2-desktop-amd64.iso");
        HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build();

        var handler = HttpResponse.BodyHandlers.ofByteArrayConsumer(b -> System.out.print("#"));
        BodyHandlerWrapper handlerWrapper = new BodyHandlerWrapper(handler);

        client.sendAsync(request, handlerWrapper).thenAccept(b -> System.out.println(b.statusCode()));
        Thread.sleep(1000);
        handlerWrapper.cancel();

        System.out.println("\r\n------Invoke cancel...---------");
        Thread.sleep(2500);
    }
}
4

3 回答 3

3

您可以通过取消传递给响应的对象来使用java.net.http.HttpClientAPI取消 HTTP 请求。为了获取订阅对象,简单地包装提供的/实现之一应该相对容易。不幸的是,客户端返回的方法与传递给. 取消请求的正确方法是通过订阅的方法。Flow.SubscriptionBodySubscriberBodyHandlerBodySubscribercancelCompletableFuturecancelFlow.SubscriptionBodySubscribercancel

取消订阅将同时使用同步 ( HttpClient::send) 和异步 ( HttpClient::sendAsync) 方法。但是,它会产生不同的效果,具体取决于请求是通过 HTTP/1.1 还是 HTTP/2.0 发送的(对于 HTTP/1.1,它将导致连接关闭,对于 HTTP/2.0,它将导致流被重置)。当然,如果响应的最后一个字节已经传送到BodySubscriber.

于 2019-03-20T10:28:49.690 回答
2

同步 VS 异步

请求可以同步或异步发送。同步 API 阻塞,直到 Http 响应可用

HttpResponse<String> response =
      client.send(request, BodyHandlers.ofString());
System.out.println(response.statusCode());
System.out.println(response.body());

异步 API 立即返回一个 CompletableFuture,当 HttpResponse 可用时完成。CompletableFuture 是在 Java 8 中添加的,并支持可组合的异步编程。

client.sendAsync(request, BodyHandlers.ofString())
      .thenApply(response -> { System.out.println(response.statusCode());
                               return response; } )
      .thenApply(HttpResponse::body)
      .thenAccept(System.out::println);

未来对象

Future 表示异步计算的结果。Java 文档

这意味着它不是同步函数,并且您的假设“我希望,该请求任务将在之后立即取消”仅适用于同步方法。

检查 Future 对象的取消

isCancelled()如果您想检查您的任务是否被取消,有一个有用的方法。

if(future.isCancelled()) {
  // Future object is cancelled, do smth
} else {
  // Future object is still running, do smth
}

sendAsync() 返回一个 CompletableFuture 对象

该方法sendAsync()返回一个CompletableFuture。注意 aCompletableFuture实现了 的接口Future

您可以执行以下操作:

client.sendAsync(request, BodyHandlers.ofString())
          .thenAccept(response -> {
       // do action when completed;
});

在技​​术术语中,该thenAccept方法添加了一个Consumer在响应可用时调用的方法。

为什么取消 CompletableFuture 的方法不起作用

由于(不同于FutureTask)此类无法直接控制导致其完成的计算,因此取消被视为另一种形式的异常完成。方法 cancel 与 具有相同的效果completeExceptionally(new CancellationException())。方法isCompletedExceptionally()可用于确定是否CompletableFuture以任何异常方式完成。

如果使用 a 异常完成CompletionException,方法get()get(long, TimeUnit)throw anExecutionException的原因与对应的CompletionException. 为了在大多数情况下简化使用,此类还定义了方法join()和 getNow(T),它们CompletionException在这些情况下直接抛出。

换句话说

cancel()方法不使用中断来进行取消,这就是它不起作用的原因。你应该使用completeExceptionally(new CancellationException())

参考

于 2019-03-17T16:44:35.560 回答
0

至少对于同步请求,您可以中断正在调用的线程httpClient.send(..)

然后,http 客户端尽可能快地中止请求并抛出InterruptedException自身。

于 2019-09-05T13:06:51.553 回答