2

我有以下代码:

return CompletableFuture.supplyAsync(() -> {
    return foo; // some custom object
})
.thenAccept(foo -> {
     // ??? need to spawn N async parallel jobs that works on 'foo'
});

英文:第一个任务foo异步创建对象;然后我需要在它上面运行 N 个并行进程。

那么有没有更好的方法来做到这一点:

...
CompletableFuture[] parallel = new CompletableFuture[N];
for (int i = 0; i < N; i++) {
    parallel[i] = CompletableFuture.runAsync(() -> {
        work(foo);
    });
}
CompletableFuture.allOf(parallel).join();
...

我不喜欢这样,因为一个线程在等待 N 个作业完成时被锁定。

4

2 回答 2

2

您可以将任意数量的独立作业链接到特定的先决条件作业,例如

CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
Collections.nCopies(N, base).forEach(f -> f.thenAcceptAsync(foo -> work(foo)));

在完成提供实例的初始作业后,将产生N并行作业,并发调用。work(foo)Foo

But keep in mind, that the underlying framework will consider the number of available CPU cores to size the thread pool actually executing the parallel jobs, so if N > #cores, some of these jobs may run one after another.

If the work is I/O bound, thus, you want to have a higher number of parallel threads, you have to specify your own executor.


The nCopies/forEach is not necessary, a for loop would do as well, but it provides a hint of how to handle subsequent jobs, that depend on the completion of all these parallel jobs:

CompletableFuture<Foo> base=CompletableFuture.supplyAsync(() -> new Foo());
CompletableFuture<Void> all = CompletableFuture.allOf(
    Collections.nCopies(N, base).stream()
        .map(f -> f.thenAcceptAsync(foo -> work(foo)))
        .toArray(CompletableFuture<?>[]::new));

Now you can use all to check for the completion of all jobs or chain additional actions.

于 2016-06-16T09:54:19.840 回答
0

由于CompletableFuture.allOf已经返回另一个CompletableFuture<Void>a 您可以对其执行另一个.thenAccept操作并在回调中从 CF 中提取返回值parallel,这样您就可以避免调用join

于 2016-06-15T23:07:35.157 回答