1

从这个链接,我只是部分理解,至少在某些时候,java嵌套并行流存在问题。但是,我无法推断出以下问题的答案:

假设我有一个外部 srtream 和一个内部流,它们都使用并行流。事实证明,根据我的计算,如果内部流首先完全并行完成,然后(如果且仅cpu核心可用)做外部流。我认为这对于大多数人的情况都是正确的。所以我的问题是:

Java会先并行执行内部流,然后再处理外部流吗?如果是这样,它是在编译时还是在运行时做出决定?如果在运行时,JIT 是否足够聪明地意识到如果内部流确实有超过足够的元素(例如数百个)而不是核心数(32),那么它肯定应该使用所有 32 个核心来处理在从外部流移动到下一个元素之前的内部流;但是,如果元素的数量很小(例如 < 32),那么“也并行处理”来自“下一个”外部流元素的元素是可以的。

4

2 回答 2

7

也许下面的示例程序可以说明这个问题:

IntStream.range(0, 10).parallel().mapToObj(i -> "outer "+i)
         .map(outer -> outer+"\t"+IntStream.range(0, 10).parallel()
            .mapToObj(inner -> Thread.currentThread())
            .distinct() // using the identity of the threads
            .map(Thread::getName) // just to be paranoid, as names might not be unique
            .sorted()
            .collect(Collectors.toList()) )
         .collect(Collectors.toList())
         .forEach(System.out::println);

当然,结果会有所不同,但我机器上的输出看起来与此类似:

outer 0 [ForkJoinPool.commonPool-worker-6]
outer 1 [ForkJoinPool.commonPool-worker-3]
outer 2 [ForkJoinPool.commonPool-worker-1]
outer 3 [ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-4, ForkJoinPool.commonPool-worker-5]
outer 4 [ForkJoinPool.commonPool-worker-5]
outer 5 [ForkJoinPool.commonPool-worker-2, ForkJoinPool.commonPool-worker-4, ForkJoinPool.commonPool-worker-7, main]
outer 6 [main]
outer 7 [ForkJoinPool.commonPool-worker-4]
outer 8 [ForkJoinPool.commonPool-worker-2]
outer 9 [ForkJoinPool.commonPool-worker-7]

我们在这里可以看到,对于我的机器,有 8 个内核,7 个工作线程正在参与工作,以利用所有内核,至于common pool,调用者线程也将参与工作,而不仅仅是等待完成。您可以清楚地看到main输出中的线程。

此外,您可以看到外部流获得了完全的并行性,而一些内部流完全由单个线程处理。每个工作线程都对至少一个外部流的元素有贡献。如果您将外部流的大小减少到核心数,您很可能会看到恰好有一个工作线程处理一个外部流元素,这意味着所有内部流的完全顺序执行。

但是我使用了一个与内核数量不匹配的数字,甚至不是它的倍数,来演示另一种行为。由于外部流处理的工作负载不均匀,即一些线程只处理一项,而另一些线​​程处理两项,这些空闲的工作线程执行工作窃取,贡献了剩余外部元素的内部流处理。

这种行为背后有一个简单的理由。当外部流的处理开始时,它并不知道它将是一个“外部流”。它只是一个并行流,除了处理它之外,没有办法确定这是否是外部流,直到其中一个函数启动另一个流操作。但是将并行处理推迟到可能永远不会到来的这一点是没有意义的。

除此之外,我强烈反对您的假设“如果内部流首先完全并行完成,它的性能会更高 [...]”。对于典型的用例,我宁愿反过来期待它,阅读,期待这样做的优势,就像它已经实现的那样。但是,正如上一段中所解释的,无论如何都没有合理的方法来实现并行处理内部流的偏好。

于 2017-08-08T16:58:10.637 回答
1

根据我刚才写的小测试答案是no(约Would Java execute inner stream all in parallel first, and then work on outerstream)。请注意,默认情况下,在我的机器上,将使用 4 个用于流操作的线程。

    List<Integer> first = List.of(1, 2, 3, 4);
    List<Integer> second = List.of(5, 6, 7, 8);

    first.stream().parallel()
            .peek(x -> {
                System.out.println("first : " + x + " " + Thread.currentThread().getName());
            })
            .map(x -> second.stream().parallel().peek(y -> {

                System.out.println("second : " + y + " " + Thread.currentThread().getName());

            }).collect(Collectors.toList()))
            .filter(x -> true)
            .collect(Collectors.toList());

您可以从输出中看到内部流没有首先执行。您可以增加每个流中的元素数量以获得更准确的输出(交错“第一”和“第二” - 不知道它是否是正确的术语)。

但是这里还有一些让我印象深刻的东西......上面的例子如何不阻塞超出了我的范围。只有 4 个线程和 4 个元素,所有线程都在等待内部流处理;但是ForkJoinPool没有可用的线程 - 那么它是如何工作的呢?您提供的链接(@Holger 的回答)说创建的线程将比您实际请求的线程多。但是输出中缺少他们的名字......

于 2017-08-08T15:17:55.813 回答