7

我正在尝试使用分而治之(又名 fork/join)方法来解决数字运算问题。这是代码:

import scala.actors.Futures.future

private def compute( input: Input ):Result = {
  if( pairs.size < SIZE_LIMIT ) {
    computeSequential()
  } else {
    val (input1,input2) = input.split
    val f1 = future( compute(input1) )
    val f2 = future( compute(input2) )
    val result1 = f1()
    val result2 = f2()
    merge(result1,result2)
  }
}

它运行(具有很好的加速),但未来的 apply 方法似乎阻塞了一个线程并且线程池极大地增加了。当创建太多线程时,计算就会卡住。

有没有一种释放线程的期货反应方法?或任何其他方式来实现这种行为?

编辑:我正在使用 scala 2.8.0.final

4

1 回答 1

8

不要声称(应用)您Future的 s,因为这会迫使他们阻止并等待答案;如您所见,这可能导致死锁。相反,单子使用它们来告诉他们完成后要做什么。代替:

val result1 = f1()
val result2 = f2()
merge(result1,result2)

尝试这个:

for {
  result1 <- f1
  result2 <- f2
} yield merge(result1, result2)

其结果将是包含合并结果的Responder[Result](本质上是 a );您可以使用orFuture[Result]对这个最终值做一些有效的事情,或者您可以将它或它到另一个。无需阻塞,只需为未来安排计算!respond()foreach()map()flatMap()Responder[T]

编辑1:

好的,compute函数的签名将不得不更改为Responder[Result]现在,那么这对递归调用有何影响?让我们试试这个:

private def compute( input: Input ):Responder[Result] = {
  if( pairs.size < SIZE_LIMIT ) {
    future(computeSequential())
  } else {
    val (input1,input2) = input.split
    for {
      result1 <- compute(input1)
      result2 <- compute(input2)
    } yield merge(result1, result2)
  }
}

现在您不再需要包装对computewith的调用,future(...)因为它们已经返回Responder(的超类Future)。

编辑2:

使用这种延续传递风格的一个结果是,您的顶级代码——无论compute最初调用什么——不再阻塞。如果它是从 调用的main(),而这就是程序所做的一切,这将是一个问题,因为现在它只会产生一堆期货,然后在完成所有被告知要做的事情后立即关闭。你需要做的是block所有这些期货,但只有一次,在顶层,而且只对所有计算的结果,而不是任何中间计算。

不幸的是,这个Responder被返回的东西compute()不再有apply()像之前那样的阻塞方法Future。我不确定为什么 flatMapping Futures 会产生泛型Responder而不是Future; 这似乎是一个 API 错误。但无论如何,您应该能够自己制作:

def claim[A](r:Responder[A]):A = {
  import java.util.concurrent.ArrayBlockingQueue
  import scala.actors.Actor.actor

  val q = new ArrayBlockingQueue[A](1)
  // uses of 'respond' need to be wrapped in an actor or future block
  actor { r.respond(a => q.put(a)) } 
  return q.take
}

所以现在你可以在你的main方法中创建一个阻塞调用来计算,如下所示:

val finalResult = claim(compute(input))
于 2010-10-04T08:40:04.097 回答