我在测试中模拟三个演员(A,B,C)之间的对话
A ---> MessageA2B ---> B ---> MessageB2C ---> C
当MessageB2C
成功到达 C 时,确认被发送回源。
C --> MessageB2C_Ack --> B --> MessageA2B_Ack --> A
这次谈话的唯一特点是信息MessageB2C
。
MessageB2C
至少每 50 ms 发送一次,直到 C 没有用它的确认来回答。
我已经用 scala testkit 框架实现了这个简单的对话,但是在特定情况下测试失败了。
https://github.com/freedev/reactive-akka-testkit
当 ActorB 重试发送 MessageB2C 的次数超过一次时,则无法从 ActorC 接收到答案。ActorC 对 ActorB 的回复发送到 deadLetters。
test("expectNoMessage-case: actorB retries MessageB2C every 50 milliseconds") {
val actorA = TestProbe()
val actorC = TestProbe()
val actorB = system.actorOf(ActorB.props(Props(classOf[TestRefWrappingActor], actorC)), "step1-case2-primary")
actorA.send(actorB, MessageA2B())
actorA.expectNoMessage(100.milliseconds)
actorC.expectMsg(MessageB2C())
// Retries form above
actorC.expectMsg(200.milliseconds, MessageB2C())
// Never reach this point with 100 ms frequency
actorC.expectMsg(200.milliseconds, MessageB2C())
actorA.expectNoMessage(100.milliseconds)
actorC.reply(MessageB2C_Ack())
// Never reach this point with MessageB2C 50 ms frequency
actorA.expectMsg(MessageA2B_Ack())
}
这是ActorB
代码:
class ActorB(actorCProps: Props) extends Actor {
import ActorB._
import context.dispatcher
val log = Logging(context.system, this)
val actorC = context.actorOf(actorCProps)
def retry(actorRef: ActorRef, message: Any, maxAttempts: Int, attempt: Int): Future[Any] = {
log.info("ActorB - sent message MessageB2C to ActorC " + actorC)
val future = Patterns.ask(actorRef, message, 50.millisecond) recover {
case e: AskTimeoutException =>
if (attempt <= maxAttempts) retry(actorRef, message, maxAttempts, attempt + 1)
else None
}
future
}
def receive = {
case r:MessageA2B => {
val client = context.sender()
implicit val timeout = Timeout(100.milliseconds)
implicit val scheduler=context.system.scheduler
val p = MessageB2C()
retry(actorC, p, 10) onSuccess({
case p: MessageB2C_Ack => {
client ! MessageA2B_Ack()
}
})
}
}
}
更新
奇怪的是,测试成功完成评论该行actorA.expectNoMessage