我正在使用 CompletableFuture 异步执行从列表源生成的流。
所以我正在测试重载方法,即 CompletableFuture 的“supplyAsync”,其中一种方法仅采用单个供应商参数,而另一种采用供应商参数和执行器参数。这是两者的文档:
一
supplyAsync(供应商供应商)
返回一个新的 CompletableFuture,它由在 ForkJoinPool.commonPool() 中运行的任务异步完成,其值通过调用给定的供应商获得。
第二
supplyAsync(Supplier 供应商,Executor 执行者)
返回一个新的 CompletableFuture,它由在给定执行程序中运行的任务异步完成,其值通过调用给定供应商获得。
这是我的测试课:
public class TestCompleteableAndParallelStream {
public static void main(String[] args) {
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(Collectors.toList());
useCompletableFuture(tasks);
useCompletableFutureWithExecutor(tasks);
}
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
executor.shutdown();
}
public static void useCompletableFuture(List<MyTask> tasks) {
long start = System.nanoTime();
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
System.out.println(result);
}
}
class MyTask {
private final int duration;
public MyTask(int duration) {
this.duration = duration;
}
public int calculate() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(duration * 1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
return duration;
}
}
“useCompletableFuture”方法大约需要 4 秒才能完成,而“useCompletableFutureWithExecutor”方法只需 1 秒即可完成。
不,我的问题是,可以做开销的 ForkJoinPool.commonPool() 有什么不同的处理?那我们不应该总是更喜欢自定义执行器池而不是 ForkJoinPool 吗?