我有一个很长的字符串序列,它们需要由某个处理函数单独处理,然后作为另一个序列对象收集。这个问题似乎非常适合 fork/join 类型的攻击。
该函数是一个实例化非常昂贵的类的成员。但是在期货之间实例化和共享单个类对象似乎会导致问题,因此我实例化了可用处理器数量的 4 倍,然后将它们拆分到期货之间。
// instantiate processing class objects
val processors = 1 to SomeNumber map (x=> new MyProcessor)
val processorstream = Stream.continually(processors).flatten
// the input strings
val input: Seq[String] = some sequence of strings
val splitinput = input.grouped(some large number).toStream
// create futures
val mytask = splitinput.zip(processorstream).collect {
case (subseq of strings, processor) => future {
map elements of subsequence of strings with processor}}
然后我像这样收集输出
val result = mytask.map(x => x.apply()).reduce(_++_) // or some appropriate concatenation operator
我的问题是,即使我有 8 个内核,这也不能让我充分利用 cpu。它只使用一个核心。
为了调查,我尝试过的另一种选择是
val input: Seq[String] = some sequence of strings
// no stage where I split the input into subsequences
val mytask = input.zip(processorstream).collect {
case (string, processor) => future {
process string with processor}}
val result = mytask.map(x => x.apply())
这种替代方法既有效又无效。它实现了充分的 cpu 利用率,但抛出了几个异常,因为(假设)处理器在每个字符串中运行的速度太快,有时相同的处理器对象会同时应用于不同的字符串。
我更加确信我的假设是处理器工作速度太快,因为如果我提供更长的输入(例如,整个文本文档而不是 10 个单词的标题),我可以充分利用 CPU,而不会抛出任何异常。
我还尝试过使用 akka 期货和 scalaz 承诺,当我将输入序列拆分为子序列时,它们似乎都只使用一个 cpu。
那么,在这种情况下,如何在使用字符串子序列作为输入的同时充分利用期货?