首先,让我用 2 个静态函数重写您的代码,以便更容易看到发生了什么:
// Make an executor equivalent to Executors.newFixedThreadPool(nThreads)
// that will trace to standard error when a task begins or ends
static ExecutorService loggingExecutor(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.err.println("Executor beginning task on thread: "
+ t.getName());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.err.println("Executor finishing task on thread: "
+ Thread.currentThread().getName());
}
};
}
和
// same as what you pass to thenComposeAsync
static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) {
return b -> {
System.err.println(Thread.currentThread().getName()
+ ": About to enqueue task");
CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
executor.execute(() -> {
System.err.println(Thread.currentThread().getName()
+ ": Inner task");
innerFuture.complete(true);
});
System.err.println(Thread.currentThread().getName()
+ ": Task enqueued");
return innerFuture;
};
}
现在我们可以编写你的测试用例如下:
ExecutorService e = loggingExecutor(1);
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.join();
e.shutdown();
/* Output before deadlock:
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/
让我们测试您的结论,即在计算出第二个未来的结果之前不会释放第一个线程:
ExecutorService e = loggingExecutor(2); // use 2 threads this time
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.join();
e.shutdown();
/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
Executor finishing task on thread: pool-1-thread-1
*/
实际上,线程 1 似乎一直保持到线程 2 完成
让我们看看你是否正确,thenComposeAsync
它本身会阻塞:
ExecutorService e = loggingExecutor(1);
CompletableFuture<Boolean> future =
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e);
System.err.println("thenComposeAsync returned");
future.join();
e.shutdown();
/*
thenComposeAsync returned
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/
thenComposeAsync
没有阻止。它立即返回,CompletableFuture
并且仅在我们尝试完成它时才发生死锁。那么完成返回的future应该怎么做.thenComposeAsync(inner(e), e)
呢?
- API需要等待innner(e)返回
CompletableFuture<Boolean>
- 它需要等待返回
CompletableFuture<Boolean>
也完成。只有这样,未来才是完整的。因此,如您所见,它不能按照您的建议执行并返回不完整的 Future。
它是一个错误吗?为什么 CompletionStage 在计算内部任务时会保留线程 1?正如您所指出的,这不是一个错误,因为文档非常模糊,并且不承诺以任何特定顺序释放线程。另外,请注意 Thread1 将用于then*()
CompletableFuture 的任何后续方法。考虑以下:
ExecutorService e = loggingExecutor(2);
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.thenRun(() -> System.err.println(Thread.currentThread().getName()
+ ": All done"))
.join();
e.shutdown();
/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
pool-1-thread-1: All done
Executor finishing task on thread: pool-1-thread-1
*/
如您所见,.thenRun(...) 在线程 1 上执行。我相信这与 CompletableFuture 的其他 *Async(... , Executor exec) 方法一致。
但是,如果您想将 的功能拆分thenComposeAsync
为 2 个单独可控的步骤,而不是将其留给 API 来处理线程怎么办?你可以这样做:
ExecutorService e = loggingExecutor(1);
completedFuture(true)
.thenApplyAsync(inner(e), e) // do the async part first
.thenCompose(x -> x) // compose separately
.thenRun(() -> System.err.println(Thread.currentThread().getName()
+ ": All done"))
.join();
e.shutdown();
一切都将在 1 个线程上运行良好,没有死锁。
总之,正如您所说,这种行为是不直观的吗?我不知道。我无法想象为什么thenComposeAsync
甚至存在。如果一个方法返回CompletableFuture
,它不应该阻塞,也不应该有理由异步调用它。