也许下面的示例程序可以说明这个问题:
IntStream.range(0, 10).parallel().mapToObj(i -> "outer "+i)
.map(outer -> outer+"\t"+IntStream.range(0, 10).parallel()
.mapToObj(inner -> Thread.currentThread())
.distinct() // using the identity of the threads
.map(Thread::getName) // just to be paranoid, as names might not be unique
.sorted()
.collect(Collectors.toList()) )
.collect(Collectors.toList())
.forEach(System.out::println);
当然,结果会有所不同,但我机器上的输出看起来与此类似:
outer 0 [ForkJoinPool.commonPool-worker-6]
outer 1 [ForkJoinPool.commonPool-worker-3]
outer 2 [ForkJoinPool.commonPool-worker-1]
outer 3 [ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-4, ForkJoinPool.commonPool-worker-5]
outer 4 [ForkJoinPool.commonPool-worker-5]
outer 5 [ForkJoinPool.commonPool-worker-2, ForkJoinPool.commonPool-worker-4, ForkJoinPool.commonPool-worker-7, main]
outer 6 [main]
outer 7 [ForkJoinPool.commonPool-worker-4]
outer 8 [ForkJoinPool.commonPool-worker-2]
outer 9 [ForkJoinPool.commonPool-worker-7]
我们在这里可以看到,对于我的机器,有 8 个内核,7 个工作线程正在参与工作,以利用所有内核,至于common pool,调用者线程也将参与工作,而不仅仅是等待完成。您可以清楚地看到main
输出中的线程。
此外,您可以看到外部流获得了完全的并行性,而一些内部流完全由单个线程处理。每个工作线程都对至少一个外部流的元素有贡献。如果您将外部流的大小减少到核心数,您很可能会看到恰好有一个工作线程处理一个外部流元素,这意味着所有内部流的完全顺序执行。
但是我使用了一个与内核数量不匹配的数字,甚至不是它的倍数,来演示另一种行为。由于外部流处理的工作负载不均匀,即一些线程只处理一项,而另一些线程处理两项,这些空闲的工作线程执行工作窃取,贡献了剩余外部元素的内部流处理。
这种行为背后有一个简单的理由。当外部流的处理开始时,它并不知道它将是一个“外部流”。它只是一个并行流,除了处理它之外,没有办法确定这是否是外部流,直到其中一个函数启动另一个流操作。但是将并行处理推迟到可能永远不会到来的这一点是没有意义的。
除此之外,我强烈反对您的假设“如果内部流首先完全并行完成,它的性能会更高 [...]”。对于典型的用例,我宁愿反过来期待它,阅读,期待这样做的优势,就像它已经实现的那样。但是,正如上一段中所解释的,无论如何都没有合理的方法来实现并行处理内部流的偏好。