43

我对 CompletableFuture 方法有疑问:

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)

问题是 JavaDoc 就是这样说的:

返回一个新的 CompletionStage,当此阶段正常完成时,将使用此阶段的结果作为所提供函数的参数来执行该阶段。有关异常完成的规则​​,请参阅 CompletionStage 文档。

线程呢?这将在哪个线程中执行?如果future是由一个线程池完成的呢?

4

5 回答 5

50

正如@nullpointer指出的那样,文档会告诉您您需要了解的内容。但是,相关文本出奇地模糊,此处发布的一些评论(和答案)似乎依赖于文档不支持的假设。因此,我认为将其分开是值得的。具体来说,我们应该非常仔细地阅读这一段:

为非异步方法的依赖完成提供的操作可以由完成当前 CompletableFuture 的线程执行,或者由完成方法的任何其他调用者执行。

听起来很简单,但细节很简单。它似乎故意避免描述何时可以在完成线程上调用依赖完成,而不是在调用完成方法(如thenApply. 如所写,上面的段落实际上是在乞求我们用假设来填补空白。这很危险,尤其是当主题涉及并发和异步编程时,我们作为程序员开发的许多期望都被推翻了。让我们仔细看看文档没有说什么。

该文档并未声称在调用之前complete()注册的依赖完成将在完成线程上运行。此外,虽然它声明在调用完成方法时可能thenApply会调用依赖完成,但它并未声明将在注册它的线程上调用完成(注意“任何其他”一词)。

CompletableFuture对于任何用来安排和编写任务的人来说,这些都是潜在的重要点。考虑以下事件序列:

  1. 线程 A 通过 注册一个依赖完成f.thenApply(c1)
  2. 一段时间后,线程 B 调用f.complete().
  3. 大约在同一时间,线程 C 通过 注册另一个依赖完成f.thenApply(c2)

从概念上讲,complete()它做了两件事:它发布未来的结果,然后它尝试调用依赖完成。现在,如果线程 C在发布结果值之后运行,但线程 B 开始调用之前会发生c1什么?根据实现,线程 C 可能会看到f已经完成,然后它可能会调用c1 and c2。或者,线程 C 可以调用c2,同时让线程 B 调用c1。该文件不排除任何一种可能性。考虑到这一点,以下是文档不支持的假设:

  1. 在完成之前c注册的依赖完成将在调用期间被调用;f f.complete()
  2. c将在f.complete()返回时完成;
  3. 将按任何特定顺序(例如,注册顺序)调用依赖完成;
  4. 在完成之前 注册的依赖完成将在完成f注册的完成之前调用。 f

考虑另一个例子:

  1. 线程 A 调用f.complete()
  2. f.thenApply(c1)一段时间后,线程 B 通过;注册完成。
  3. 大约在同一时间,线程 C 通过 注册一个单独的完成f.thenApply(c2)

如果已知它f已经运行到完成,人们可能会假设c1将在 期间调用f.thenApply(c1)并且c2将在 期间调用f.thenApply(c2)。人们可能会进一步假设在返回c1时将运行完成f.thenApply(c1)。但是,文档支持这些假设。有可能其中一个线程调用thenApply最终调用 c1and c2,而另一个线程调用了两者都不调用。

对 JDK 代码的仔细分析可以确定上述假设场景的结果。但即使这样也是有风险的,因为您最终可能会依赖(1)不可移植或(2)可能更改的实现细节。您最好的选择是不要假设 javadocs 或原始 JSR 规范中没有说明的任何内容。

tldr:小心你的假设,当你写文档时,尽可能清晰和深思熟虑。虽然简洁是一件美妙的事情,但要警惕人类填补空白的倾向。

于 2017-09-05T20:31:27.090 回答
31

CompletableFuture文档中指定的策略可以帮助您更好地理解:

  • 为非异步方法的依赖完成提供的操作可以由完成当前 CompletableFuture 的线程执行, 或者由完成方法的任何其他调用者执行

  • 所有没有显式 Executor 参数的异步方法都使用 Executor 执行ForkJoinPool.commonPool()(除非它不支持 至少两个并行级别,在这种情况下,会创建一个新线程来运行每个任务)。为了简化监控、调试和跟踪,所有生成的异步任务都是标记接口的实例CompletableFuture.AsynchronousCompletionTask

更新:我还建议阅读@Mike 的这个答案,作为对文档细节的有趣分析。

于 2017-09-05T17:41:15.430 回答
9

Javadoc

为非异步方法的依赖完成提供的操作可以由完成当前 CompletableFuture 的线程执行,或者由完成方法的任何其他调用者执行。

更具体地说:

  • fn将在调用期间在complete()任何线程调用的上下文中运行complete()

  • 如果在被调用complete()的时候已经完成,将在线程调用的上下文中运行。thenApply()fnthenApply()

于 2017-09-05T17:36:46.170 回答
7

在线程方面,缺少 API 文档。理解线程和期货是如何工作的需要一些推理。从一个假设开始:非Async方法CompletableFuture不会自行产生新线程。工作将在现有线程下进行。

thenApply将在原来CompletableFuture的线程中运行。这要么是调用的线程,要么是complete()调用thenApply()未来已经完成的线程。如果你想控制线程——如果fn是一个缓慢的操作是个好主意——那么你应该使用thenApplyAsync.

于 2017-09-05T17:33:49.053 回答
1

我知道这个问题很老,但我想用源代码来解释这个问题。

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

private CompletableFuture<Void> uniAcceptStage(Executor e,
                                               Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    Object r;
    if ((r = result) != null)
        return uniAcceptNow(r, e, f);
    CompletableFuture<Void> d = newIncompleteFuture();
    unipush(new UniAccept<T>(e, d, this, f));
    return d;
}

这是来自 java 16 的源代码,我们可以看到,如果我们触发 thenAccept,我们会将一个空的执行器服务引用传递给我们的函数。从第二个函数 uniAcceptStage() 第二个 if 条件。如果结果不为空,它将触发 uniAcceptNow()

if (e != null) {
     e.execute(new UniAccept<T>(null, d, this, f));
} else {
     @SuppressWarnings("unchecked") T t = (T) r;
     f.accept(t);
     d.result = NIL;
}

如果执行器服务为空,我们将使用 lambda 函数 f.accept(t) 来执行它。如果我们从主线程触发 thenApply/thenAccept,它将使用主线程作为执行线程。

但是,如果我们无法从上一个可完成的未来中获得先前的结果,我们将使用 uniPush 函数将当前的 UniAccept/Apply 推入堆栈。UniAccept 类有 tryFire() 将由我们的 postComplete() 函数触发

final void postComplete() {
    /*
     * On each step, variable f holds current dependents to pop
     * and run.  It is extended along only one path at a time,
     * pushing others to avoid unbounded recursion.
     */
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        if (STACK.compareAndSet(f, h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                NEXT.compareAndSet(h, t, null); // try to detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}
于 2022-01-16T22:02:23.810 回答