由于您似乎有一个演员需要进行处理并转换到 FSM,我建议您遵循以下准则(一些基本代码大纲遵循此列表):
- 参与者 A 接收到将触发某些处理的相关消息
- 参与者 A 将一个单独的 FSM 参与者,比如 F,(
akka.actor.FSM
) 转换到适当的状态。Actor A 在启动时会生成特定的 FSM 以跟踪相应上下文的状态(例如,所有事务或每个事务的状态或某些其他上下文)。下面的代码大纲使用所有处理或完成的事务作为示例的上下文,但可能需要更改。
- 然后参与者 A 触发应该为消息触发的任何处理。请记住,actor 通常不应该阻塞,但这里有一个答案,它提供了关于Akka actor 何时可以阻塞的更多指导。
- 替代方案:如果您可以在不阻塞的情况下触发长时间运行的处理,并确保您在对方处理阶段后收到必要的事件,那么您可以消除前面的 Actor A 并只拥有 FSM Actor F。您应该看看
onTransition
这种情况。
所以我的代码大纲建议是基于我从问题中理解的:
/* Events */
sealed trait MyEvents
case class ProcessingStarted(txnId: Long) extends MyEvents
case class ProcessingFinished(txnId: Long, result: Result) extends MyEvents
/* Valid states for your FSM */
sealed trait MyStates
case object Idle extends MyStates
/* Constructor arguments could be anything, I randomly chose a Long for a transaction ID which may be specific to a job */
case object Processing extends MyStates
/* define more case classes or objects depending on the rest of the states */
/* Valid internal state data types for FSM */
sealed trait MyDataTypes
case object Uninitialized extends MyDataTypes
case class StateData(processingIds: Seq[Long], resultMap: Map[Long, Result]) extends MyDataTypes
import akka.actor.{ Actor, ActorRef, FSM }
import scala.concurrent.duration._
class ActorF extends FSM[MyStates, MyDataTypes] {
startWith(Idle, Uninitialized)
when(Idle) {
case Event(ProcessingStarted(txnId), Uninitialized) =>
goto(Processing) using StateData(Seq(txnId), Map.empty[Long, Result])
case Event(ProcessingStarted(txnId), StateData(emptyIds, resultMap)) =>
goto(Processing) using StateData(Seq(txnId), resultMap)
}
when(Processing) {
case Event(ProcessingFinished(txnId, result), StateData(processingIds, resultMap)) => {
val remainingIds = processingIds diff Seq(txnId)
val newResultMap = resultMap + (txnId -> result)
if (remainingIds.isEmpty) {
goto(Idle) using StateData(remainingIds, newResultMap)
} else {
stay using StateData(remainingIds, newResultMap)
}
}
}
initialize()
}
// inside Actor A do something like this for creating the FSM actor (just like other actors)
val f = system.actorOf(Props(classOf[ActorF], this))
// send an event message to it just like other types of actors
f ! ProcessingStarted(txnId)
如果您选择触发对代码其他部分的非阻塞处理请求,您可以根据onTransition
需要添加触发代码。您可能还希望将 MyEvents 案例类更新为不同的时态。上面使用的事件命名是为了表明其他东西负责触发该事件(例如,Actor A 收到了初始消息以执行某些操作)。
还要注意Akka 的监督能力,可以在这里用来监督有问题的演员。
有关更多详细信息,请阅读以下内容,这可能有助于进一步构建 FSM、对其进行测试、使用非阻塞方法在外部触发长时间运行的处理。所有这些都可能对您的需求有用: