0

我有一个可以分发的密集任务,但其最终结果需要累积并传递给另一个方法。

具体而言,假设我正在对大量文本文件中的单个文件进行字数统计。

到目前为止,我制定的版本如下所示:

import scala.actors.Actor
import scala.actors.Actor._
import scala.collection.mutable.{ArrayBuffer => mArray}

case object Stop

class Collector(val bin: mArray[(String, Int)], tcount: Int) extends Actor {
  def act() {
    loop {
      receive {
        case (fn: String, val: Int) => {
          // the following "bin" object is what I ultimately need to get back
          bin.append((fn, val))
        }
        case Stop => {
          tcount -= 1
          if(tcount == 0) exit()
        }}}}}

class Processor(col: Collector, flist: Seq[File]) extends Actor {
  def act() {
    for(fn <- flist) {
      val wcount = count words in fn // just a place holder for word counting code
      col ! (fn, wcount)
    }    
    col ! (id, Stop)
  }
}

我可以想到几个自制的方法,让 main 方法等待收集器完成,然后处理“bin”对象。

但是什么是正确的 scala 方法来检索上面的“bin”并将其交还给 main 或者你有什么?

4

2 回答 2

0

使用演员,您不会“检索”任何东西,因为调用演员方法可能很危险。

相反,你让演员发送答案。您可以为对象添加一个案例GET,或者在您的情况下,您可以让它bin在处理程序中的某处发送Stop.

于 2012-08-08T15:03:23.050 回答
0

您可以使用方法 ping 演员!!来创建未来的结果。接收消息的参与者可以使用 来填写结果reply


但是,您可以通过使用执行程序来节省大量管理工作,这更适合并行化任务(您实际上并没有并发情况,其中参与者特别有用)。

以下基于 Scala 2.10(即将发布,您现在可以使用 2.10.0-M6),因为它包含改进的并发框架:

import java.io.File
import concurrent._
import java.util.concurrent.Executors

// count words in a file - probably not the best possible way to write this
def wc(f: File): Int = io.Source.fromFile(f).getLines.map(
   _.split(' ').filterNot(_ == "").size).sum

// utility method to get files within a directory (non-recursive!)
def filesInDir(dir: File): Seq[File] = dir.listFiles.toSeq.filter(_.isFile)

// process method which takes a list of files, spawns a word-count for each
// and collects the individual future results into a `Map` whose future is returned
def process(files: Seq[File])(implicit exec: ExecutionContext)
: Future[Seq[(File, Either[Throwable, Int])]] = {
  val futs = files.map { f =>
    // `future` submits the body for asynchronous processing. In order to
    // gracefully handle IO errors, a successful result is wrapped in `Right`,
    // and an exception in `Left`. The caller can decide how to handle errors.
    future {
      f -> (try {
        Right(wc(f))
      } catch {
        case e: Throwable => Left(e)
      })
    }
  }
  // collect all the individual results into one result object.
  // the new single `Future` is only complete after all of the individual futures
  // have completed.
  Future.sequence(futs)
}

例子:

// use as many threads as there are processor cores
val poolSize = sys.runtime.availableProcessors()

// create a new executor context to be used for spawning
implicit val exec = ExecutionContext.fromExecutor(
  Executors.newFixedThreadPool(poolSize))

// call the process
val map = process(filesInDir(new File("/my/directory")))

// execute some body when the future is completed
map.onSuccess { case m =>
  m foreach println
}
于 2012-08-08T16:48:23.973 回答