5

在我的场景中,我有 2 个演员:

  1. watchee(我用TestProbe
  2. watcherWatcher包裹起来TestActorRef以暴露state我在测试中跟踪的一些内部)

watchee观察者在死亡时应该采取一些行动。

这是我到目前为止编写的完整测试用例:

class TempTest(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with FunSuiteLike with Matchers with BeforeAndAfterAll {

  def this() = this(ActorSystem("TempTest"))

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  class WatcherActor(watchee: ActorRef) extends Actor {

    var state = "initial"
    context.watch(watchee)

    override def receive: Receive = {
      case "start" =>
        state = "start"
      case _: Terminated =>
        state = "terminated"
    }

  }

  test("example") {
    val watchee = TestProbe()
    val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))

    assert(watcher.underlyingActor.state === "initial")

    watcher ! "start" // "start" will be sent and handled by watcher synchronously
    assert(watcher.underlyingActor.state === "start")

    system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
    Thread.sleep(100) // what is the best way to avoid blocking here?
    assert(watcher.underlyingActor.state === "terminated")
  }

}

现在,由于所有涉及的参与者都使用CallingThreadDispatcher(所有 Akka 的测试助手都是使用带有 的道具构建的.withDispatcher(CallingThreadDispatcher.Id)),我可以安全地假设当这个语句返回时:

watcher ! "start"

...“开始”消息已经被处理WatchingActor,因此我可以根据watcher.underlyingActor.state

但是,根据我的观察,当我停止watchee使用system.stop或向Kill它发送消息时,作为死亡Terminated副作用产生的消息会在另一个线程中异步执行。watchee

不是解决方案是停止watchee,阻塞线程一段时间并Watcher在此之后验证状态,但我想知道如何以正确的方式做到这一点(即如何确保在杀死演员之后它的观察者收到并处理 Terminated消息表明它已经死亡)?

4

2 回答 2

5

解决此问题的一种方法是在您的测试中引入另一个观察者,该观察者也观察watchee. 这个另一个观察者是一个TestProbe允许我们对其执行断言的方法,这将消除您所看到的时间问题。一、修改后的测试代码:

 val watchee = TestProbe()
 val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))
 val probeWatcher = TestProbe()
 probeWatcher watch watchee.ref

 assert(watcher.underlyingActor.state === "initial")

 watcher ! "start" // "start" will be sent and handled by watcher synchronously
 assert(watcher.underlyingActor.state === "start")

 system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
 probeWatcher.expectTerminated(watchee.ref)
 assert(watcher.underlyingActor.state === "terminated")

所以你可以看到我已经引入了额外的观察者:

val probeWatcher = TestProbe()
probeWatcher watch watchee.ref

然后,在代码的后面,在最终的断言对你失败之前,我使用另一个断言让我知道Terminated停止的参与者的消息已经正确分发:

probeWatcher.expectTerminated(watchee.ref)

当代码越过这一行时,我可以确定watcher被测对象也收到了它的终止消息,并且要遵循的断言将通过。

编辑

正如 OP 所指出的,此代码存在一定程度的不确定性。另一种可能的解决方案是将测试代码中阻止参与者的行更改为:

watcher.underlyingActor.context.stop(watchee.ref)

通过使用contextofTestActorRef我相信Terminated将全部通过 the 传递CallingThreadDispatcher,因此是完全同步的。我在一个循环中测试了它,它为我工作了超过 1000 次迭代。

现在我想这可能是因为我正在stop使用相同的演员来执行预期Terminated,也许有一个优化来Terminated为那个扫描仪提供自我,所以我也用完全不同的方式测试了这个,Actor如下所示:

class FooActor extends Actor{
  def receive = {
    case _ =>
  }

然后在测试代码中:

val foo = TestActorRef(new FooActor)

在停止时:

foo.underlyingActor.context.stop(watchee.ref)

这也按预期工作。

于 2014-10-20T17:55:12.643 回答
3

编辑:在与 OP 讨论和测试之后,我们发现发送 PoisonPill 作为终止被监视actor的一种手段可以实现所需的行为,因为 PPill 的 Terminated 是同步处理的,而 stop 或 kill 的那些是异步处理的。

虽然我们不确定原因,但我们最好的选择是这是因为杀死演员会引发异常,而 PPilling 不会引发异常。

显然,这与按照我最初的建议使用 gracefulStop 模式无关,我将在下面留下报告。

总之,解决 OP 问题的方法就是终止被监视的 actor 发送一个 PPill 而不是一个 Kill 消息或执行 system.stop。


旧答案从这里开始:

我可能会提出一些无关紧要的建议,但我觉得它可能适用。

如果我理解正确,您想要做的基本上是同步终止演员,即仅在演员正式死亡并且其死亡已被记录(在您的情况下由观察者记录)时才返回。

一般来说,死亡通知以及 akka 中的大多数其他内容都是异步的。尽管如此,使用gracefulStop 模式(akka.pattern.gracefulStop)可以获得同步死亡确认。

为此,代码应类似于:

val timeout = 5.seconds
val killResultFuture = gracefulStop(victimRef, timeout, PoisonPill)
Await.result(killResultFuture, timeout)

这样做是将 PoisonPill 发送给受害者(注意:您可以使用自定义消息),受害者将回复一个在受害者死亡后完成的未来。使用 Await.result 你可以保证是同步的。

不幸的是,这仅在以下情况下可用:a)您正在积极杀死受害者,并且您不想对外部死亡原因做出响应 b)您可以在代码中使用超时和阻塞。但也许你可以根据你的情况调整这种模式。

于 2014-10-22T14:55:48.643 回答