27

以下是我对Java 8的Stream 框架的理解:

  1. 某些东西创建了源流
  2. 该实现负责提供一个BaseStream#parallel()方法,该方法又返回一个可以并行运行其操作的 Stream。

虽然有人已经找到了一种将自定义线程池与 Stream 框架的并行执行一起使用的方法,但我终生无法在 Java 8 API 中找到任何提及默认 Java 8 并行 Stream 实现将使用ForkJoinPool#commonPool()的内容。(Collection#parallelStream()StreamSupport类中的方法,以及我不知道的 API 中其他可能的并行启用流来源)。

我可以从搜索结果中看到的只有这些花絮:


所以我的问题是:

哪里说ForkJoinPool#commonPool()用于对从 Java 8 API 获得的流进行并行操作?

4

3 回答 3

14

Wrt 在哪里记录了 Java 8 并行流使用 FJ 框架?

Afaik (Java 1.8u5) 在并行流的 JavaDoc 中没有提到使用通用的 ForkJoinPool。

但它在http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html底部的 ForkJoin 文档中有所提及

Wrt替换线程池

我的理解是您可以使用自定义 ForkJoinPool (而不是常见的) - 请参阅Java 8 并行流中的自定义线程池- 但不能使用与 ForkJoin 实现不同的自定义 ThreadPool (我在这里有一个悬而未决的问题:How (全局)替换 Java 并行流的通用线程池后端?

Wrt 替换 Streams api

您可以查看https://github.com/nurkiewicz/LazySeq,这是一个更像 Scala 的流实现 - 非常好,非常有趣

PS (wrt ForkJoin 和 Streams)

如果您有兴趣,我想指出我偶然发现了使用 FJ 池的一些问题,请参阅,例如

于 2014-07-08T10:45:56.493 回答
5

值得一提的是,Java 8 in Action 有一章是关于并行数据处理和性能的(第 7 章)。它说:

“...Stream 接口让您有机会在不费力的情况下对一组数据并行执行操作。”

“...您将看到 Java 如何实现这一奇迹,或者更实际地,通过采用 Java 7 中引入的 fork/join 框架,并行流如何在后台工作。”

它在第 7.1 节中还有一个小注解:

“并行流在内部使用默认的 ForkJoinPool ......默认情况下,它具有与处理器一样多的线程,由Runtime.getRuntime().availableProcessors().“返回

“您可以使用系统属性 java.util .concurrent.ForkJoinPool.common.parallelism 更改此池的大小,如下例所示:”

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");

如评论和其他答案中所述,这并不意味着它将始终使用 fork/join。

于 2015-08-05T21:12:32.173 回答
1

You can check source code of terminal operations on GrepCode. For example, lets take a look at ForEachOp. As you can see evaluateParallel method of ForEachOp creates and invokes ForEachTask object which is derived from CountedCompleter derived from ForkJoinTask.

于 2014-07-08T10:37:02.467 回答