2

我想知道是否有一种方法可以在没有太多开销的 scala 中的另一个线程上执行非常简单的任务?

基本上我想制作一个可以处理执行任意数量的任务的全局“执行器”。然后我可以使用 executor 来构建额外的结构。

此外,如果客户端不必考虑阻塞或非阻塞因素,那就太好了。

我知道 scala 演员库是建立在 Doug Lea FJ 的东西之上的,而且他们在有限的程度上支持我想要完成的事情。但是,据我了解,我将不得不预先分配一个“演员池”来完成。

我想避免为此创建一个全局线程池,因为据我了解,它在细粒度并行性方面并不是那么好。

这是一个简单的例子:

import concurrent.SyncVar
object SimpleExecutor {
  import actors.Actor._
  def exec[A](task:  => A) : SyncVar[A] = {
    //what goes here?
    //This is what I currently have
    val x = new concurrent.SyncVar[A]
    //The overhead of making the actor appears to be a killer
    actor {
      x.set(task)
    }
    x
  }
  //Not really sure what to stick here
  def execBlocker[A](task: => A) : SyncVar[A] = exec(task)

}

现在是使用 exec 的示例:

object Examples {
  //Benchmarks a task
  def benchmark(blk : => Unit) = {
    val start = System.nanoTime
    blk
    System.nanoTime - start
  }

  //Benchmarks and compares 2 tasks
  def cmp(a: => Any, b: => Any) = {
    val at = benchmark(a)
    val bt = benchmark(b)
    println(at + " " + bt + " " +at.toDouble / bt)
  }

  //Simple example for simple non blocking comparison
  import SimpleExecutor._
  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

  //Simple example for the blocking performance
  import Thread.sleep
  def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
  def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
}

最后运行示例(可能需要执行几次,以便 HotSpot 可以预热):

import Examples._
cmp(paraAdd(10000), singAdd(10000))
cmp(paraSle(100), singSle(100))
4

1 回答 1

8

这就是Futures为之而生的。只是import scala.actors.Futures._,用于future创建新的期货,方法比如awaitAll等待结果一段时间,apply或者respond阻塞直到收到结果,isSet看看它是否准备好等等。

您也不需要创建线程池。或者,至少,通常情况下你不会。为什么你认为你这样做?

编辑

将整数加法这样简单的东西并行化是无法获得性能的,因为这比函数调用还要快。并发只会通过避免因阻塞 i/o 而浪费时间和使用多个 CPU 内核并行执行任务来提高性能。在后一种情况下,任务的计算成本必须足够高,以抵消划分工作负载和合并结果的成本。

使用并发的另一个原因是提高应用程序的响应能力。这不是让它更快,而是让它更快地响应用户,而这样做的一种方法是将相对快速的操作卸载到另一个线程,以便处理用户看到或所做的线程可以更快。但我离题了。

您的代码存在严重问题:

  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

或者,转化为期货,

  def paraAdd(hi: Int) = (0 until hi) map (i=>future(i+5)) foreach (_.apply)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

你可能认为paraAdd并行执行任务,但事实并非如此,因为它Range有一个非严格的实现map(这取决于 Scala 2.7;从 Scala 2.8.0 开始,Range是严格的)。您可以在其他 Scala 问题上查找它。会发生什么:

  1. 创建一个范围,从0直到hi
  2. 范围投影是从范围的每个元素 i 创建到future(i+5)调用时返回的函数中。
  3. 对于范围投影 ( 的每个元素i => future(i+5)),对元素求值 (foreach是严格的),然后在apply其上调用函数。

因此,因为在第 2 步future没有调用,而仅在第 3 步中调用,所以您将等待每个future完成,然后再执行下一个。您可以使用以下方法修复它:

  def paraAdd(hi: Int) = (0 until hi).force map (i=>future(i+5)) foreach (_.apply)

这将为您提供更好的性能,但永远不如简单的立即添加。另一方面,假设您这样做:

def repeat(n: Int, f: => Any) = (0 until n) foreach (_ => f)
def paraRepeat(n: Int, f: => Any) = 
  (0 until n).force map (_ => future(f)) foreach (_.apply)

然后比较:

cmp(repeat(100, singAdd(100000)), paraRepeat(100, singAdd(100000)))

您可能会开始看到收益(这将取决于内核数量和处理器速度)。

于 2009-09-02T21:52:24.220 回答