我有以下演员设置,使用 Akka 演员(2.10)
A -spawn-> B -spawn-> C
A -sendWork-> B -sendWork-> C
C -sendResults-> A(重复)
但是,在某些时候,A 注意到它应该更改发送给 B/C 的工作负载,因为 C 正在发送大量无用的消息。但是,在这种情况下,C 的收件箱似乎很满,和/或 C 可能被阻止。
A如何告诉B立即关闭C?丢失 B 和 C 的状态和消息是可以接受的,因此可以选择销毁它们并生成新的。
我有以下演员设置,使用 Akka 演员(2.10)
A -spawn-> B -spawn-> C
A -sendWork-> B -sendWork-> C
C -sendResults-> A(重复)
但是,在某些时候,A 注意到它应该更改发送给 B/C 的工作负载,因为 C 正在发送大量无用的消息。但是,在这种情况下,C 的收件箱似乎很满,和/或 C 可能被阻止。
A如何告诉B立即关闭C?丢失 B 和 C 的状态和消息是可以接受的,因此可以选择销毁它们并生成新的。
鉴于演员是按照您描述的方式开始的,那么stop
以正确的方式使用将满足您的要求。根据文档,调用 stop 将:
1)阻止其他消息进入邮箱(发送到死信)
2)获取邮箱的当前内容并将其发送到死信(虽然这是基于邮箱实现,但关键是它们不会被处理)
现在,如果演员需要在它完全停止之前完全完成它当前正在处理的消息,所以如果它“卡住”,停止(或与此相关的任何事情)不会解决这个问题,但我不认为那是你描述的情况。
我将一个小代码示例放在一起进行演示。基本上,A 将向 B 发送消息以开始向 C 发送工作。B 将向 C 发送一些工作,C 会将工作的结果发送回 A。当 A 收到一定数量的响应时,它将通过停止 B 来触发 B 和 C 的停止。当 B 完全停止时,它将再次重新启动该过程,总共最多 2 次,因为它自己停止了。代码如下所示:
case object StartWork
case class DoWork(i:Int, a:ActorRef)
case class WorkResults(i:Int)
class ActorA extends Actor{
import context._
var responseCount = 0
var restarts = 0
def receive = startingWork
def startingWork:Receive = {
case sw @ StartWork =>
val myb = actorOf(Props[ActorB])
myb ! sw
become(waitingForResponses(myb))
}
def waitingForResponses(myb:ActorRef):Receive = {
case WorkResults(i) =>
println(s"Got back work results: $i")
responseCount += 1
if (responseCount > 200){
println("Got too many responses, terminating children and starting again")
watch(myb)
stop(myb)
become(waitingForDeath)
}
}
def waitingForDeath:Receive = {
case Terminated(ref) =>
restarts += 1
if (restarts <= 2){
println("children terminated, starting work again")
responseCount = 0
become(startingWork)
self ! StartWork
}
else{
println("too many restarts, stopping self")
context.stop(self)
}
}
}
class ActorB extends Actor{
import concurrent.duration._
import context._
var sched:Option[Cancellable] = None
override def postStop = {
println("stopping b")
sched foreach (_.cancel)
}
def receive = starting
def starting:Receive = {
case sw @ StartWork =>
val myc = context.actorOf(Props[ActorC])
sched = Some(context.system.scheduler.schedule(1 second, 1 second, self, "tick"))
become(sendingWork(myc, sender))
}
def sendingWork(myc:ActorRef, a:ActorRef):Receive = {
case "tick" =>
for(j <- 1 until 1000) myc ! DoWork(j, a)
}
}
class ActorC extends Actor{
override def postStop = {
println("stopping c")
}
def receive = {
case DoWork(i, a) =>
a ! WorkResults(i)
}
}
它的边缘有点粗糙,但它应该表明从 B 到 C 的级联停止将阻止 C 将响应发送回 A,即使它仍然在邮箱中有消息。我希望这就是你要找的。