当使用默认 Akka 系统调度程序上的方法(context().system().scheduler().schedule()
从参与者内部)以及接受目标参与者的重载之一时,我是否需要在目标参与者停止时使用返回的 Cancellable 显式取消以释放资源?
我想调度程序可能是watch()
目标参与者并自动执行清理,但在文档中的任何地方都找不到明确的状态。
当使用默认 Akka 系统调度程序上的方法(context().system().scheduler().schedule()
从参与者内部)以及接受目标参与者的重载之一时,我是否需要在目标参与者停止时使用返回的 Cancellable 显式取消以释放资源?
我想调度程序可能是watch()
目标参与者并自动执行清理,但在文档中的任何地方都找不到明确的状态。
Scheduler.schedule
采用 an的变体不会监视该演员(相对于计时器任务而言,这将具有相当高的开销),因此您应该始终从演员的钩子中ActorRef
清理循环计时器。postStop
在本地情况下,我们检查target.isTerminated
,但该方法总是返回false
不属于沼泽标准本地类型的演员引用,因此您只能在特定情况下依赖此功能,然后当您扩展您的代码时,您的代码将停止正常工作应用。另一个考虑因素是,在尝试发送消息时会运行上述检查,这可能是“软泄漏”(即延迟清理),以防调度较长(根据用例,100 毫秒可能已经很长)。
看起来任务仍在运行,并且消息很可能最终会变成一纸空文。您可以使用以下代码示例查看该行为:
import akka.actor._
import scala.concurrent.duration._
object SchedTest {
def main(args: Array[String]) {
val sys = ActorSystem("test")
val ref = sys.actorOf(Props[MyActor])
val can = sys.scheduler.schedule(1 second, 1 second){
println("executing")
ref ! "foo"
}(sys.dispatcher)
Thread.sleep(10000)
sys.stop(ref)
Thread.sleep(5000)
can.cancel
}
}
class MyActor extends Actor{
def receive = {
case _ =>
println("received message...")
}
}
10 秒后,您将不再看到“received message...”字符串被打印,但您将继续看到“executing”字符串被打印。然后在我手动终止任务后,您不再看到“已执行”被打印出来。
我没有对此进行测试,但这是直接从文档中复制和粘贴的(虽然是 2.2-M3)。
final def schedule(
initialDelay: FiniteDuration,
interval: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext,
sender: ActorRef = Actor.noSender): Cancellable =
schedule(initialDelay, interval, new Runnable {
def run = {
receiver ! message
if (receiver.isTerminated)
throw new SchedulerException("timer active for terminated actor")
}
})
这个问题已经很老了,但我想提一下,你通常可以避免重复的时间表。
在许多情况下,actor 希望稍后向自己发送消息。在这种情况下,一个有用的模式是不使用schedule
,而是让消息处理程序使用 为自己安排消息scheduleOnce
。
class MyActor extends Actor {
def receive = {
case "Message" =>
context.system.scheduler.scheduleOnce(5.seconds, self, "Message")
}
}
调度程序将在actor停止后尝试一次消息传递,但由于actor是本地的,它甚至不会发送消息(参见Roland Kuhn的回答)。
如果您有大量需要预定消息的参与者,您可能需要查看其他答案,但上述方法通常就足够了。您甚至可以让间隔根据负载等因素而变化。