1

我经常想要并行化依赖于非线程安全共享资源的任务。考虑以下非线程安全类。我想在data: Vector[String].

class Processor { def apply: String => String }

基本上,我想创建n线程,每个线程都有一个数据实例Processor和一个分区。Scala 并行集合让我觉得并行化应该很简单。但是,它们似乎不太适合这个问题。是的,我可以使用演员,但 Scala 演员可能会被弃用,而 Akka 似乎有点矫枉过正。

首先想到的是有一个同步的映射,然后使用并行集合,在这个线程安全的映射中Thread -> Processor查找我的。Processor有没有更好的办法?

4

2 回答 2

1

您可以使用ThreadLocal. 这将保证Processor每个线程都是唯一的。

val processors = new ThreadLocal[Processor] {
  def initialValue() = new Processor
}

data.par.map(x => processors.get.apply(x))
于 2012-04-27T13:57:25.120 回答
0

或者,您尝试使用配置为显式使用指定数量的线程的执行器服务:

  val processors = new ThreadLocal[Processor] {
    override def initialValue() = new Processor
  }

  val N = 4

  // create an executor with fixed number of threads
  val execSvc = Executors.newFixedThreadPool(N)

  // create the tasks
  data foreach {
    loopData =>
      execSvc.submit(new Runnable() {
        def run = processors.get().apply(loopData)
      })
  }

  // await termination
  execSvc.shutdown()
  while(!execSvc.awaitTermination(1, TimeUnit.SECONDS)) {
    ;
  }

  // processing complete!
于 2012-04-27T16:30:04.903 回答