0

我用 FSM 描述的演员正在等待触发器(处于空闲状态)。当它得到它时,它开始处理一些数据(并进入运行状态),完成后它又回到空闲状态。

如果我正确理解 FSM 模型,那么从这个角度来看,有两个事件:处理开始(空闲->运行)和处理完成(运行->空闲)。

但从演员的角度来看,只有一条信息。

一种可能性是将处理本身委托给另一个参与者。所以我可以转发触发事件并进入运行状态,然后在得到结果时进入空闲状态。它的优点是 FSM 本身可以快速响应请求(例如询问当前状态是什么),但它使设计更加复杂。

另一种是在actor完成处理后向self发送一条完成的消息,这将触发Running -> Idle转换,但对我来说看起来有点奇怪。

我还有什么其他选择?

注意:还有其他几个具有各种转换的状态,所以我想坚持 FSM 模型。

4

1 回答 1

4

由于您似乎有一个演员需要进行处理并转换到 FSM,我建议您遵循以下准则(一些基本代码大纲遵循此列表):

  • 参与者 A 接收到将触发某些处理的相关消息
  • 参与者 A 将一个单独的 FSM 参与者,比如 F,( akka.actor.FSM) 转换到适当的状态。Actor A 在启动时会生成特定的 FSM 以跟踪相应上下文的状态(例如,所有事务或每个事务的状态或某些其他上下文)。下面的代码大纲使用所有处理或完成的事务作为示例的上下文,但可能需要更改。
  • 然后参与者 A 触发应该为消息触发的任何处理。请记住,actor 通常不应该阻塞,但这里有一个答案,它提供了关于Akka actor 何时可以阻塞的更多指导。
  • 替代方案:如果您可以在不阻塞的情况下触发长时间运行的处理,并确保您在对方处理阶段后收到必要的事件,那么您可以消除前面的 Actor A 并只拥有 FSM Actor F。您应该看看onTransition这种情况。

所以我的代码大纲建议是基于我从问题中理解的:

/* Events */
sealed trait MyEvents
case class ProcessingStarted(txnId: Long) extends MyEvents
case class ProcessingFinished(txnId: Long, result: Result) extends MyEvents

/* Valid states for your FSM */
sealed trait MyStates 
case object Idle extends MyStates
/* Constructor arguments could be anything, I randomly chose a Long for a transaction ID which may be specific to a job */
case object Processing extends MyStates
/* define more case classes or objects depending on the rest of the states */

/* Valid internal state data types for FSM */
sealed trait MyDataTypes
case object Uninitialized extends MyDataTypes
case class StateData(processingIds: Seq[Long], resultMap: Map[Long, Result]) extends MyDataTypes

import akka.actor.{ Actor, ActorRef, FSM }
import scala.concurrent.duration._

 class ActorF extends FSM[MyStates, MyDataTypes] {
   startWith(Idle, Uninitialized)

   when(Idle) {
     case Event(ProcessingStarted(txnId), Uninitialized) =>
       goto(Processing) using StateData(Seq(txnId), Map.empty[Long, Result])
     case Event(ProcessingStarted(txnId), StateData(emptyIds, resultMap)) =>
       goto(Processing) using StateData(Seq(txnId), resultMap)
   }

   when(Processing) {
     case Event(ProcessingFinished(txnId, result), StateData(processingIds, resultMap)) => {
       val remainingIds = processingIds diff Seq(txnId)
       val newResultMap = resultMap + (txnId -> result)
       if (remainingIds.isEmpty) {
         goto(Idle) using StateData(remainingIds, newResultMap)
       } else {
         stay using StateData(remainingIds, newResultMap)
       }
     }
   }

   initialize()
 }

 // inside Actor A do something like this for creating the FSM actor (just like other actors)
 val f = system.actorOf(Props(classOf[ActorF], this))

 // send an event message to it just like other types of actors
 f ! ProcessingStarted(txnId)

如果您选择触发对代码其他部分的非阻塞处理请求,您可以根据onTransition需要添加触发代码。您可能还希望将 MyEvents 案例类更新为不同的时态。上面使用的事件命名是为了表明其他东西负责触发该事件(例如,Actor A 收到了初始消息以执行某些操作)。

还要注意Akka 的监督能力,可以在这里用来监督有问题的演员。

有关更多详细信息,请阅读以下内容,这可能有助于进一步构建 FSM、对其进行测试、使用非阻塞方法在外部触发长时间运行的处理。所有这些都可能对您的需求有用:

于 2013-09-30T12:47:34.927 回答