31

我正在玩 Java 8 可完成的期货。我有以下代码:

CountDownLatch waitLatch = new CountDownLatch(1);

CompletableFuture<?> future = CompletableFuture.runAsync(() -> {
    try {
        System.out.println("Wait");
        waitLatch.await(); //cancel should interrupt
        System.out.println("Done");
    } catch (InterruptedException e) {
        System.out.println("Interrupted");
        throw new RuntimeException(e);
    }
});

sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");

assertTrue(future.isCancelled());

assertTrue(future.isDone());
sleep(100); //give it some time to finish

使用 runAsync 我安排执行等待闩锁的代码。接下来我取消未来,期望内部抛出一个中断的异常。但似乎线程在 await 调用上仍然被阻塞,即使未来被取消(断言通过),也永远不会抛出 InterruptedException。使用 ExecutorService 的等效代码按预期工作。它是 CompletableFuture 还是我的示例中的错误?

4

6 回答 6

26

当您调用 时CompletableFuture#cancel,您只会停止链条的下游部分。上游部分,即最终将调用complete(...)or的东西completeExceptionally(...),没有得到任何不再需要结果的信号。

那些“上游”和“下游”的东西是什么?

让我们考虑以下代码:

CompletableFuture
        .supplyAsync(() -> "hello")               //1
        .thenApply(s -> s + " world!")            //2
        .thenAccept(s -> System.out.println(s));  //3

在这里,数据从上到下流动——从供应商创建,通过功能修改,到被消费println。特定步骤上方的部分称为上游,下方的部分称为下游。例如。第 1 步和第 2 步是第 3 步的上游。

以下是幕后发生的事情。这并不精确,而是对正在发生的事情的一种方便的思维模型。

  1. 供应商(步骤 1)正在执行(在 JVM 的 common 内ForkJoinPool)。
  2. 然后供应商的结果被传递complete(...)到下一个CompletableFuture下游。
  3. 收到结果后,将CompletableFuture调用下一步 - 一个函数(步骤 2),该函数接受上一步的结果并返回将进一步传递给下游CompletableFuturecomplete(...).
  4. 收到第 2 步的结果后,第 3 步调CompletableFuture用消费者,System.out.println(s). 消费者完成后,下游CompletableFuture将收到它的价值,(Void) null

正如我们所看到的,CompletableFuture这条链中的每个人都必须知道谁在下游等待将值传递给他们的complete(...)(或completeExceptionally(...))。但是CompletableFuture不必知道它的上游(或上游 - 可能有几个)。

因此,调用cancel()第 3 步不会中止第 1 步和第 2 步,因为没有从第 3 步到第 2 步的链接。

假设如果您正在使用,CompletableFuture那么您的步骤足够小,因此如果执行几个额外的步骤也不会造成伤害。

如果您希望取消在上游传播,您有两种选择:

  • 自己实现 - 创建一个专用的CompletableFuture(命名为cancelled),在每一步后检查(类似step.applyToEither(cancelled, Function.identity())
  • 使用 RxJava 2、ProjectReactor/Flux 或 Akka Streams 等反应式堆栈
于 2017-06-06T18:28:54.283 回答
18

显然,这是故意的。CompletableFuture::cancel方法的 Javadoc指出:

[参数:] mayInterruptIfRunning - 这个值在这个实现中没有影响,因为中断不用于控制处理。

有趣的是,方法ForkJoinTask::cancel对参数mayInterruptIfRunning使用了几乎相同的措辞。

我对这个问题有一个猜测:

  • 中断旨在与阻塞操作一起使用,例如sleepwait或 I/O 操作,
  • CompletableFutureForkJoinTask都不打算与阻塞操作一起使用。

CompletableFuture应该创建一个新的CompletionStage而不是阻塞,而 CPU 绑定任务是 fork-join 模型的先决条件。因此,对他们中的任何一个使用中断都会破坏他们的目的。另一方面,它可能会增加复杂性,如果按预期使用,这不是必需的。

于 2014-04-27T21:38:31.260 回答
5

如果您确实希望能够取消任务,那么您必须使用Future自身(例如,由返回ExecutorService.submit(Callable<T>),而不是。正如nosidCompletableFuture的回答中指出的那样,完全忽略对.CompletableFuturecancel(true)

我怀疑JDK团队没有实现中断是因为:

  1. 打断总是很老套,很难让人理解,也很难相处。Java I/O 系统甚至是不可中断的,尽管调用InputStream.read()是阻塞调用!(JDK 团队没有计划让标准 I/O 系统再次可中断,就像 Java 早期那样。)
  2. JDK 团队一直在非常努力地从早期的 Java 时代逐步淘汰旧的损坏 API,例如Object.finalize()Object.wait()Thread.stop()等。我相信这些 APIThread.interrupt()被认为是最终必须弃用和替换的东西。因此,较新的 API(如ForkJoinPoolCompletableFuture)已经不支持它。
  3. CompletableFuture设计用于构建 DAG 结构的操作管道,类似于 Java StreamAPI。很难简洁地描述数据流 DAG 的一个节点的中断应该如何影响 DAG 其余部分的执行。(当任何节点中断时,是否应该立即取消所有并发任务?)
  4. 我怀疑 JDK 团队只是不想处理正确的中断问题,因为 JDK 和库如今已达到的内部复杂程度。(lambda 系统的内部结构——呃。)

解决这个问题的一种非常老套的方法是让每个都将对CompletableFuture自身的引用导出到 external-visible AtomicReference,然后Thread可以在需要时从另一个外部线程直接中断引用。或者,如果您使用自己的 启动所有任务ExecutorService,在您自己的 中ThreadPool,您可以手动中断任何或所有已启动的线程,即使CompletableFuture拒绝通过cancel(true). (请注意,尽管CompletableFuturelambdas 不能抛出已检查的异常,因此如果您在 a 中有可中断的等待CompletableFuture,则必须将其作为未检查的异常重新抛出。)

更简单地说,您可以AtomicReference<Boolean> cancel = new AtomicReference<>()在外部范围内声明 an,并定期从每个CompletableFuture任务的 lambda 中检查此标志。

您还可以尝试设置实例的 DAGFuture而不是实例的 DAG CompletableFuture,这样您就可以准确地指定任何一个任务中的异常和中断/取消应该如何影响其他当前正在运行的任务。我在我的问题here的示例代码中展示了如何做到这一点,它运行良好,但它有很多样板。

于 2020-12-01T19:09:46.023 回答
2

您需要 CompletionStage 的替代实现来完成真正的线程中断。我刚刚发布了一个专门用于此目的的小型库 - https://github.com/vsilaev/tascalate-concurrent

于 2017-07-24T21:30:58.087 回答
1

即使Future.cancel(..)被调用,等待的调用仍然会阻塞。正如其他人所提到的,CompletableFuture不会使用中断来取消任务。

根据javadoc CompletableFuture.cancel(..)

mayInterruptIfRunning这个值在这个实现中没有影响,因为中断不是用来控制处理的。

即使实现会导致中断,您仍然需要阻塞操作才能取消任务或通过Thread.interrupted().

您可以在操作中设置检查点,而不是中断Thread(这可能并不总是那么容易),您可以在其中优雅地终止当前任务。这可以在将要处理的某些元素的循环中完成,或者您在操作的每个步骤之前检查取消状态并CancellationException自己抛出一个。

棘手的部分是获取CompletableFuture任务内的引用以便调用Future.isCancelled(). 这是如何完成的示例:

public abstract class CancelableTask<T> {

    private CompletableFuture<T> task;

    private T run() {
        try {
            return compute();
        } catch (Throwable e) {
            task.completeExceptionally(e);
        }
        return null;
    }

    protected abstract T compute() throws Exception;

    protected boolean isCancelled() {
        Future<T> future = task;
        return future != null && future.isCancelled();
    }

    public Future<T> start() {
        synchronized (this) {
            if (task != null) throw new IllegalStateException("Task already started.");
            task = new CompletableFuture<>();
        }
        return task.completeAsync(this::run);
    }
}

编辑:这里将改进CancelableTask版本作为静态工厂:

public static <T> CompletableFuture<T> supplyAsync(Function<Future<T>, T> operation) {
    CompletableFuture<T> future = new CompletableFuture<>();
    return future.completeAsync(() -> operation.apply(future));
}

这是测试方法:

@Test
void testFuture() throws InterruptedException {
    CountDownLatch started = new CountDownLatch(1);
    CountDownLatch done = new CountDownLatch(1);

    AtomicInteger counter = new AtomicInteger();
    Future<Object> future = supplyAsync(task -> {
        started.countDown();
        while (!task.isCancelled()) {
            System.out.println("Count: " + counter.getAndIncrement());
        }
        System.out.println("Task cancelled");
        done.countDown();
        return null;
    });

    // wait until the task is started
    assertTrue(started.await(5, TimeUnit.SECONDS));
    future.cancel(true);
    System.out.println("Cancel called");

    assertTrue(future.isCancelled());

    assertTrue(future.isDone());
    assertTrue(done.await(5, TimeUnit.SECONDS));
}

如果您真的想使用除 之外的中断CompletableFuture,那么您可以将自定义传递ExecutorCompletableFuture.completeAsync(..)您创建自己的位置Thread,覆盖cancel(..)CompletableFuture中断您的Thread.

于 2021-03-23T23:50:43.223 回答
0

CancellationException 是内部 ForkJoin 取消例程的一部分。当您检索未来的结果时,将出现异常:

try { future.get(); }
      catch (Exception e){
          System.out.println(e.toString());            
      }

花了一段时间在调试器中看到这个。JavaDoc 并不清楚正在发生什么或您应该期待什么。

于 2014-04-27T22:25:57.503 回答