1

我在测试中具有覆盖 actorOf 的特征:

trait ActorRefFactory {
  this: Actor =>

  def actorOf(props: Props) = context.actorOf(props)
}

而且我有工人演员,当收到任何消息时会停止自我:

class WorkerActor extends Actor {
  override def receive: Actor.Receive = {
    case _ => { context.stop(self) }
  }
}

我还有主演员,他创建演员并将他们排在队列中:

class MasterActor extends Actor with ActorRefFactory {
  var workers = Set.empty[ActorRef]

  override val supervisorStrategy = SupervisorStrategy.stoppingStrategy

  def createWorker() = {
    val worker = context watch actorOf(Props(classOf[WorkerActor]))
    workers += worker
    worker
  }

  override def receive: Receive = {
    case m: String =>
      createWorker()
    case Terminated(ref) =>
      workers -= ref
      createWorker()
  }
}

而这个失败的测试:

class ActorTest(val _system: ActorSystem) extends akka.testkit.TestKit(_system)
  with ImplicitSender
  with Matchers
  with FlatSpecLike {

  def this() = this(ActorSystem("test"))

  def fixture = new {
    val master = TestActorRef(new MasterActor() {
      override def actorOf(props: Props) = TestProbe().ref
    })
  }

  it should "NOT FAILED" in {
    val f = fixture

    f.master ! "create"
    f.master ! "create"

    f.master.underlyingActor.workers.size shouldBe 2

    val worker = f.master.underlyingActor.workers.head
    system.stop(worker)
    Thread.sleep(100)

    f.master.underlyingActor.workers.size shouldBe 2
  }

}

在测试中的 Thread.sleep 之后,我通过“1 不等于 2”给出错误。我不知道发生了什么。但是,如果猜测我可以假设 TestProbe() 无法及时创建。我能做些什么?

4

1 回答 1

2

这基本上归结为您希望在 Akka 的单元测试中尝试避免的异步问题。您正确地使用 a来TestActorRef吸引演员。但是当您调用时,我们仍然使用默认的异步调度程序,它将在停止然后重新创建工作人员时引入这种竞争条件。我可以看到始终如一地解决此问题的最简单方法是像这样停止工作人员:CallingThreadDispatchermastersystem.stop(worker)system

master.underlyingActor.context.stop(worker)

因为您正在使用contextofmaster并且该演员正在使用 theCallingThreadDispatcher我相信这将消除您所看到的 asnyc 问题。当我尝试它时,它对我有用。

于 2014-10-22T19:46:02.737 回答