0

我有一个很长的字符串序列,它们需要由某个处理函数单独处理,然后作为另一个序列对象收集。这个问题似乎非常适合 fork/join 类型的攻击。

该函数是一个实例化非常昂贵的类的成员。但是在期货之间实例化和共享单个类对象似乎会导致问题,因此我实例化了可用处理器数量的 4 倍,然后将它们拆分到期货之间。

// instantiate processing class objects
val processors = 1 to SomeNumber map (x=> new MyProcessor)
val processorstream = Stream.continually(processors).flatten
// the input strings
val input: Seq[String] = some sequence of strings
val splitinput = input.grouped(some large number).toStream
// create futures
val mytask = splitinput.zip(processorstream).collect {
    case (subseq of strings, processor) => future {
        map elements of subsequence of strings with processor}}

然后我像这样收集输出

val result = mytask.map(x => x.apply()).reduce(_++_) // or some appropriate concatenation operator

我的问题是,即使我有 8 个内核,这也不能让我充分利用 cpu。它只使用一个核心。

为了调查,我尝试过的另一种选择是

val input: Seq[String] = some sequence of strings
// no stage where I split the input into subsequences
val mytask = input.zip(processorstream).collect {
    case (string, processor) => future {
        process string with processor}}
val result = mytask.map(x => x.apply())

这种替代方法既有效又无效。它实现了充分的 cpu 利用率,但抛出了几个异常,因为(假设)处理器在每个字符串中运行的速度太快,有时相同的处理器对象会同时应用于不同的字符串。

我更加确信我的假设是处理器工作速度太快,因为如果我提供更长的输入(例如,整个文本文档而不是 10 个单词的标题),我可以充分利用 CPU,而不会抛出任何异常。

我还尝试过使用 akka 期货和 scalaz 承诺,当我将输入序列拆分为子序列时,它们似乎都只使用一个 cpu。

那么,在这种情况下,如何在使用字符串子序列作为输入的同时充分利用期货?

4

2 回答 2

2

每 @om-nom-nom :

input.par.map { s => task(s) }
于 2012-08-18T20:08:39.827 回答
0

您可以尝试使用ThreadLocal可变处理器。相当无用的例子:

val words = io.Source.fromFile("/usr/share/dict/words").getLines.toIndexedSeq

class Processor {
  val sb = new StringBuffer() // mutable!
  println("---created processor---")
  def map(s: String): Int = {
    sb.setLength(0)
    for (i <- 1 to s.length()) {
      sb.append(s.substring(0, i))
    }
    sb.toString().sum.toInt  // don't make any sense out of this
  }
}

val tl = new ThreadLocal[Processor] {
  override protected def initialValue() = new Processor
}

val parRes = words.par.map(w => tl.get.map(w)).sum
val serRes = words.map(    w => tl.get.map(w)).sum
assert(parRes == serRes)

---created processor---正如消息所证明的那样,默认情况下,这将创建与 CPU 内核一样多的线程。

于 2012-08-18T20:41:23.160 回答