13

在 Scala 2.10 之前的版本中,我可以在 defaultForkJoinPool 中设置并行度(如this answer scala parallel collections degree of parallelism)。在 Scala 2.10 中,该 API 不再存在。有据可查的是,我们可以通过分配给它的 taskSupport 属性来设置单个集合( http://docs.scala-lang.org/overviews/parallel-collections/configuration.html )的并行度。

但是,我在整个代码库中都使用并行集合,并且不想在每个集合实例化中添加额外的两行。有没有办法配置全局默认线程池大小,以便someCollection.par.map(f(_))自动使用默认线程数?

4

1 回答 1

17

我知道这个问题已经有一个多月了,但我刚刚提出了完全相同的问题。谷歌搜索没有帮助,我在新 API 中找不到任何看起来很正常的东西。

按照此处的建议设置 -Dscala.concurrent.context.maxThreads=n:Set the parallelism level for all collections in Scala 2.10? 似乎根本没有效果,但我不确定我是否正确使用它(我在没有明确安装'scala'的环境中使用'java'运行我的应用程序,这可能是原因)。

我不知道为什么 scala-people 从适当的包对象中删除了这个必要的设置器。

但是,通常可以使用反射来解决不完整/奇怪的界面:

def setParallelismGlobally(numThreads: Int): Unit = {
  val parPkgObj = scala.collection.parallel.`package`
  val defaultTaskSupportField = parPkgObj.getClass.getDeclaredFields.find{
    _.getName == "defaultTaskSupport"
  }.get

  defaultTaskSupportField.setAccessible(true)
  defaultTaskSupportField.set(
    parPkgObj, 
    new scala.collection.parallel.ForkJoinTaskSupport(
      new scala.concurrent.forkjoin.ForkJoinPool(numThreads)
    ) 
  )
}

对于那些不熟悉 Scala 更晦涩的特性的人,这里有一个简短的解释:

scala.collection.parallel.`package`

使用 defaultTaskSupport 变量访问包对象(它看起来有点像 Java 的静态变量,但它实际上是包对象的成员变量)。标识符需要反引号,因为package它是保留关键字。然后我们得到我们想要的私有最终字段(getField(“defaultTaskSupport”)由于某种原因不起作用?...),告诉它可以访问以便能够修改它,然后将它的值替换为我们自己的 ForkJoinTaskSupport。

我还不了解创建并行集合的确切机制,但 Combiner 特征的源代码表明 defaultTaskSupport 的值应该以某种方式渗透到并行集合中。

请注意,这个问题在性质上与一个更老的问题相同:“我的代码库中都有 Math.random(),如何将种子设置为固定数字以进行调试?” (参见例如:在 Math.random() 上设置种子)。在这两种情况下,我们都有某种全局“静态”变量,我们在百万个不同的地方隐式使用它,我们想要更改它,但是这个变量没有设置器 => 我们使用反射。

丑得要命,但似乎工作得很好。如果您需要限制线程总数,请不要忘记垃圾收集器在单独的线程上运行。

于 2013-09-02T13:22:30.633 回答