1

我对 Akka 很陌生,所以我的问题可能看起来很简单:

我有一个workerA使用FSM的演员,因此可以处于这两种状态Finished,并且Computing

sealed trait State
case object Finished extends State
case object Computing extends State

sealed trait Data
case object Uninitialized extends Data
case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends Data

收到时workerAGetResponse应该回答 if 且 if only 它处于 state Finished

这样做的正确方法是什么?我知道我们应该避免在这种范式中被阻塞,但这里只关注顶级参与者。谢谢

4

2 回答 2

1

我不一定确定您在这里甚至需要FSM。当您有许多状态以及这些状态之间的许多可能(并且可能很复杂)的状态转换时,FSM 是一个非常好的工具。就您而言,如果我理解正确,您基本上有两种状态;收集数据并完成。似乎也只有一个状态转换,从gathering -> finished. 如果我这一切都正确,那么我将建议您简单地使用become来解决您的问题。

我在下面有一些代码来展示我所描述的一个简单的例子。基本思想是主角将一些工作分给一些工人,然后等待结果。如果有人在工作完成时询问结果,演员会隐藏该请求,直到工作完成。完成后,演员将回复任何要求结果的人。代码如下:

case object GetResults
case class Results(ints:List[Int])
case object DoWork

class MainActor extends Actor with Stash{
  import context._

  override def preStart = {
    val a = actorOf(Props[WorkerA], "worker-a")
    val b = actorOf(Props[WorkerB], "worker-b")
    a ! DoWork
    b ! DoWork
  }

  def receive = gathering(Nil, 2)

  def gathering(ints:List[Int], count:Int):Receive = {
    case GetResults => stash()
    case Results(i) =>      
      val results = i ::: ints
      val newCount = count - 1
      if (newCount == 0){
        unstashAll()
        become(finished(results))        
        child("worker-a") foreach (stop(_))
        child("worker-b") foreach (stop(_))
      }
      else
        become(gathering(results, newCount))
  } 

  def finished(results:List[Int]):Receive = {
    case GetResults => sender ! results
  }
}

class WorkerA extends Actor{
  def receive = {
    case DoWork =>

      //Only sleeping to simulate work.  Not a good idea in real code
      Thread sleep 3000
      val ints = for(i <- 2 until 100 by 2) yield i
      sender ! Results(ints.toList)
  }
}

class WorkerB extends Actor{
  def receive = {
    case DoWork =>

      //Only sleeping to simulate work.  Not a good idea in real code
      Thread sleep 2000  
      val ints = for(i <- 1 until 100 by 2) yield i
      sender ! Results(ints.toList)      
  }
}

然后您可以按如下方式对其进行测试:

val mainActor = system.actorOf(Props[MainActor])
val fut = mainActor ? GetResults
fut onComplete (println(_))
于 2013-10-16T13:20:11.370 回答
0

您可以对 FSM 状态进行模式匹配:

// insert pattern matching stuff instead of ...

class MyActor extends Actor with FSM[State, Message] {
  startWith(Finished, WaitMessage(null))

  when(Finished) {
    case Event(Todo(... =>
      // work
      goto(Computing) using Todo(...)
    case Event(GetResponse(... =>
      // reply: sender ! msg // or similar
  }

  /* the rest is optional. You can use onTransition below to send yourself a message to    report status of the job: */
  when(Busy) {
    case Event(Finished(... =>
      // reply to someone: sender ! msg // or similar
      goto(Finished)
  }

  onTransition {
    case Finished -> Computing =>
      // I prefer to run stuff here in a future, and then send a message to myself to signal the end of the job:
      self ! Finished(data)
  }

更具体地解决问题的编辑:

class MyActor extends Actor with FSM[State, Message] {
  startWith(Finished, WaitMessage(null))

  when(Finished) {
    case Event(Todo(... =>
      // work
      goto(Computing) using Todo(...)
    case Event(GetResponse(... =>
      // reply: sender ! msg // or similar
      stay
  }

  initialize()
}
于 2013-10-15T14:49:41.470 回答