6

I have written up a contrived code example, and it might not be code that someone ought to use, but I believe it should work. However it instead deadlocks. I've read the answers described here, but found them insufficient.

Here is the code example:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class Test {

    public static void main(String argv[]) throws Exception {
        
        int nThreads = 1;
        Executor executor = Executors.newFixedThreadPool( nThreads );
        

        
        CompletableFuture.completedFuture(true)
            .thenComposeAsync((unused)->{
            
                System.err.println("About to enqueue task");
                CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
                executor.execute(() -> {

                    // pretend this is some really expensive computation done asynchronously

                    System.err.println("Inner task");
                    innerFuture.complete(true);
                });
                System.err.println("Task enqueued");
                             
                return innerFuture;
            }, executor).get();

        System.err.println("All done");
        System.exit(0);

    }
    
}

This prints:

About to enqueue task

Task enqueued

And then it hangs. It's deadlocked because the executor only has a single thread, and it's waiting for the innerFuture to become redeemable. Why does "thenComposeAsync" block for its return value to become redeemable, instead of returning the still-incomplete future and freeing up its thread in the executor?

This feels completely unintuitive, and the javadocs don't really help. Am I fundamentally misunderstanding how CompletionStages work? Or is this a bug in the implementation?

4

2 回答 2

3

因此,在进行了很多有趣的对话后,我决定给 JDK 的一位作者发送电子邮件。发现这种行为不是故意的,并且确实是 1.8u25 中存在的错误。有一个修复程序将随 Java 8 的更高补丁版本一起发布。我不知道是哪个。对于任何想要测试新行为的人,您可以在此处下载最新的 jsr166 jar:

http://gee.cs.oswego.edu/dl/concurrency-interest/index.html

于 2014-12-21T04:55:05.740 回答
3

首先,让我用 2 个静态函数重写您的代码,以便更容易看到发生了什么:

// Make an executor equivalent to Executors.newFixedThreadPool(nThreads)
// that will trace to standard error when a task begins or ends
static ExecutorService loggingExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>()) {

                @Override
                protected void beforeExecute(Thread t, Runnable r) {
                    System.err.println("Executor beginning task on thread: " 
                       + t.getName());
                }

                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    System.err.println("Executor finishing task on thread: "
                       + Thread.currentThread().getName());
                }

            };
}

// same as what you pass to thenComposeAsync
static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) {
    return b -> {
        System.err.println(Thread.currentThread().getName() 
                   + ": About to enqueue task");
        CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
        executor.execute(() -> {
            System.err.println(Thread.currentThread().getName() 
                   + ": Inner task");
            innerFuture.complete(true);
        });
        System.err.println(Thread.currentThread().getName() 
                   + ": Task enqueued");

        return innerFuture;
    };
}

现在我们可以编写你的测试用例如下:

ExecutorService e = loggingExecutor(1);

CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e)
        .join();

e.shutdown();

/* Output before deadlock:
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/

让我们测试您的结论,即在计算出第二个未来的结果之前不会释放第一个线程:

ExecutorService e = loggingExecutor(2);  // use 2 threads this time

CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e)
        .join();

e.shutdown();

/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
Executor finishing task on thread: pool-1-thread-1
*/

实际上,线程 1 似乎一直保持到线程 2 完成

让我们看看你是否正确,thenComposeAsync它本身会阻塞:

ExecutorService e = loggingExecutor(1);

CompletableFuture<Boolean> future = 
        CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e);

System.err.println("thenComposeAsync returned");

future.join();

e.shutdown();

/*
thenComposeAsync returned
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/

thenComposeAsync没有阻止。它立即返回,CompletableFuture并且仅在我们尝试完成它时才发生死锁。那么完成返回的future应该怎么做.thenComposeAsync(inner(e), e)呢?

  1. API需要等待innner(e)返回CompletableFuture<Boolean>
  2. 它需要等待返回CompletableFuture<Boolean>也完成。只有这样,未来才是完整的。因此,如您所见,它不能按照您的建议执行并返回不完整的 Future。

它是一个错误吗?为什么 CompletionStage 在计算内部任务时会保留线程 1?正如您所指出的,这不是一个错误,因为文档非常模糊,并且不承诺以任何特定顺序释放线程。另外,请注意 Thread1 将用于then*()CompletableFuture 的任何后续方法。考虑以下:

ExecutorService e = loggingExecutor(2);

CompletableFuture.completedFuture(true)
        .thenComposeAsync(inner(e), e)
        .thenRun(() -> System.err.println(Thread.currentThread().getName()
                         + ": All done"))
        .join();

e.shutdown();

/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
pool-1-thread-1: All done
Executor finishing task on thread: pool-1-thread-1
*/

如您所见,.thenRun(...) 在线程 1 上执行。我相信这与 CompletableFuture 的其他 *Async(... , Executor exec) 方法一致。

但是,如果您想将 的功能拆分thenComposeAsync为 2 个单独可控的步骤,而不是将其留给 API 来处理线程怎么办?你可以这样做:

ExecutorService e = loggingExecutor(1);

completedFuture(true)
        .thenApplyAsync(inner(e), e) // do the async part first
        .thenCompose(x -> x)         // compose separately
        .thenRun(() -> System.err.println(Thread.currentThread().getName()
                        + ": All done"))
        .join();

e.shutdown();

一切都将在 1 个线程上运行良好,没有死锁。

总之,正如您所说,这种行为是不直观的吗?我不知道。我无法想象为什么thenComposeAsync甚至存在。如果一个方法返回CompletableFuture,它不应该阻塞,也不应该有理由异步调用它。

于 2014-12-19T06:53:52.437 回答