5

在 Scala 2.8 中,当我启动演员时,我可以通过消息传递进行通信。这反过来意味着我可以发送最终的 Exit() 消息或任何我认为适合我的协议的消息。

但是我将如何检查演员是否已经退出?我可以很容易地想象自己有一个任务,其中一个主演员启动一些工人演员然后只是等待答案,每次检查这是否是最终答案(即是否有任何演员仍在工作或他们都退出了?)。

当然,我可以让他们都发回“我完成了”的消息,然后计算他们,但这有点不满意。

测试worker-actors完成时的最佳实践是什么?

编辑#1

我正在研究期货,但遇到了麻烦。有人可以解释为什么这段代码不起作用:

package test
import scala.actors.Futures._

object FibFut extends Application{
    
    def fib(i:Int):Int = 
        if(i<2)
            1
        else
            fib(i-1)+fib(i-2)
            
    val f = future{ fib(3) }
    
    println(f())    
        
}

如果我在 future-body 中定义函数 fib,它就可以工作。它必须是一个范围的东西,但我没有得到任何上述错误,它只是挂起。任何人?

编辑#2

扩展 Application 似乎不是一个好方法。定义一个主要方法使一切正常。下面的代码是我一直在寻找的,所以Futures竖起大拇指 :)

package test

import scala.actors.Futures._

object FibFut {

  def fib(i: Int): Int = if (i < 2) 1 else fib(i - 1) + fib(i - 2)

  def main(args: Array[String]) {

    val fibs = for (i <- 0 to 50) yield future { fib(i) }

    for (future <- fibs) println(future())

  }

}
4

3 回答 3

3

就个人而言,我是“我完成了”信息的粉丝;这是管理工作分配的好方法,而且作为奖励,您已经知道所有孩子何时完成了他们正在做的事情。

但是,如果您真的只是想一次性完成一些工作并等到一切准备就绪,请查看scala.actors.Futures. 您可以要求它进行一些计算:

val futureA = Futures.future {
  val a = veryExpensiveOperation
  (a,"I'm from the future!")
}

然后你可以等待一切完成,如果你已经提出了多个请求:

Futures.awaitAll(600*1000, futureA, futureB, futureC, futureD)
// Returns once all of A-D have been computed
val actualA = futureA()   // Now we get the value
于 2010-11-28T20:19:55.297 回答
2

不久前,我写了一篇关于在 Scala 中链接演员的帖子。Actor 链接是在 Erlang、Scala Actors 和其他 Actor 库中监视 Actor 的一种惯用 [也是最简单的] 方法。默认情况下,当您链接 2 个演员时,其中一个死亡,另一个立即死亡(除非演员陷阱/处理退出信号):

scala> case object Stop
defined module Stop

scala>

scala> val actor1 = actor {
     |    loop {
     |       react {
     |          case Stop =>
     |             println("Actor 1: stop")
     |             exit()
     |          case msg => println(msg)
     |             }
     |         }
     | }
actor1: scala.actors.Actor = scala.actors.Actor$$anon$1@1feea62

scala>

scala> val actor2 = actor {
     |    link(actor1)
     |    self.trapExit = true
     |    loop {
     |       react {
     |          case msg => println(msg)
     |             }
     |         }
     | }
actor2: scala.actors.Actor = scala.actors.Actor$$anon$1@1e1c66a

scala> actor1.start
res12: scala.actors.Actor = scala.actors.Actor$$anon$1@1feea62

scala> actor2.start
res13: scala.actors.Actor = scala.actors.Actor$$anon$1@1e1c66a

scala> actor1 ! Stop
Actor 1: stop

scala> Exit(scala.actors.Actor$$anon$1@1feea62,'normal)  // Actor 2 received message, when Actor1 died

更复杂和灵活的方式是使用监督者(Erlang 中的监督者行为, Akka Actors 库中的演员监督者等)。主管(本身就是一个演员)监视许多其他演员,并根据指定的策略重新启动它们(如果一个演员死亡,则重新启动所有演员;当一个演员死亡时,只重新启动一个演员)。

于 2010-11-28T17:22:41.080 回答
0

大家好,我想出了一个使用actor类的getState函数的解决方案。在解决方案中,我使用了这个线程的一个想法:窥视 使用 reactWithin(0) 的 Scala Actor 邮箱的最佳方法。我在使用 react 和 loop 时遇到了麻烦,程序只会阻止大计算。这是通过用 while(true) 替换 loop 和用 receiveWithin(int) 替换 reactWithin(int) 来解决的。

我的解决方案如下所示(当心,bigass code-lump):

package test

import scala.actors._
import scala.actors.Actor.State._

case class Computation(index: Int, a: () ⇒ Int)
case class Result(i: String)
object Main {
  def main(args: Array[String]) {
    val m = new Master
    m.start
  }
}

class Master extends Actor {

  val N = 40
  var numberOfAnswers = 0

  def fib(x: Int): Int =
    if (x < 2)
      1
    else
      fib(x - 1) + fib(x - 2)

  val computers = for (i ← 0 to N) yield new Computer

  def act {

    for (i ← 0 until computers.size) {
      computers(i).start
      computers(i) ! Computation(i, () => fib(i))
    }

    println("done Initializing actors")
    while (true) {
      receiveWithin(1000) {

        case Result(i) =>
          val numberDone = computers.map(_.getState == Terminated).filter(_ == true).length
          println(i)
          numberOfAnswers += 1

        case TIMEOUT =>
          val allDone = computers.map(_.getState == Terminated).reduceRight(_ && _)
          println("All workers done?:" + allDone)
          println("# of answers:" + numberOfAnswers)
          if (allDone)
            exit()
      }
    }

  }

}

class Computer extends Actor {

  def act {
    loop {
      react {
        case Computation(i, f) ⇒
          sender ! Result("#" + i + " Res:" + f())
          exit()
      }
    }
  }

}

该程序计算斐波那契数(以最坏的方式)。这个想法只是为了测试大工作负载的多线程利用率。以下行检查是否某个参与者尚未终止:

computers.map(_.getState == Terminated).reduceRight(_ && _)

其中计算机的类型为 IndexedSeq[Computer]。诀窍是使用 TIMEOUT 消息,我可以定期检查所有工作是否已完成并采取相应措施(在这种情况下,当没有更多活动的工作人员时退出)。我利用了每个工作人员在退出之前发送结果的事实。通过这种方式,我知道我将始终收到结果并在它们显示为已终止之前处理它们。

当我使用 react 和 loop 而不是 while(true) 和接收时,有人可以评论程序“锁定”(停止接收消息)的事实吗?

于 2010-11-29T14:30:33.177 回答