我有一个用例,一旦请求数量达到指定值,我必须使用 akka fsm 处理请求。
sealed trait State
case object Idle extends State
case object Active extends State
sealed trait Data
case object Uninitialized extends Data
case object QuickStart extends Data
case class A(a: Int) extends Data
class RequestHandlers extends FSM[State, Data] {
val queue = mutable.Queue[A]()
startWith(Idle, Uninitialized)
when(Idle) {
case Event(_, Uninitialized) =>
println("At Idle")
// self ! QuickStart
goto(Active) using QuickStart
}
when(Active) {
case Event(_, request: A) =>
println("At Active")
queue.take(2).map{x => println("request--- " + x.a + "processing")
queue.dequeue()
}
Thread.sleep(2000L)
goto(Active) using Uninitialized
}
whenUnhandled {
case Event(update: A, QuickStart) =>
queue += update
if(queue.size >= 2) {
println(s"At unhandled + ${update}" + "--" + queue)
goto(Active) using update
}
else {
println("size has not reached")
goto(Active) using Uninitialized
}
case Event(update: A, Uninitialized) =>
queue += update
println(s"At unhandled - Uninitialised + $update")
goto(Active) using QuickStart
}
initialize()
}
object demo extends App {
val actorSystem = ActorSystem("system")
val actor = actorSystem.actorOf(Props(classOf[RequestHandlers]))
val list = (1 to 10).toList
list.foreach { abc =>
actor ! Uninitialized
actor ! A(abc)
println("Sent")
}
}
我尝试在添加请求的地方使用可变队列。在队列大小达到某个值后,即 2 同时处理这些请求。处理后,我将其出列。如果我发送 10 个请求,它将处理 8 个请求,但对于最后 2 个请求,它永远不会进入活动状态。在过渡期间,我没有得到我犯错误的地方。
任何帮助将不胜感激!