1

我在测试中模拟三个演员(A,B,C)之间的对话

A --->  MessageA2B  ---> B --->  MessageB2C ---> C 

MessageB2C成功到达 C 时,确认被发送回源。

C --> MessageB2C_Ack --> B --> MessageA2B_Ack --> A

这次谈话的唯一特点是信息MessageB2CMessageB2C至少每 50 ms 发送一次,直到 C 没有用它的确认来回答。

我已经用 scala testkit 框架实现了这个简单的对话,但是在特定情况下测试失败了。

https://github.com/freedev/reactive-akka-testkit

当 ActorB 重试发送 MessageB2C 的次数超过一次时,则无法从 ActorC 接收到答案。ActorC 对 ActorB 的回复发送到 deadLetters。

  test("expectNoMessage-case: actorB retries MessageB2C every 50 milliseconds") {
    val actorA = TestProbe()
    val actorC = TestProbe()
    val actorB = system.actorOf(ActorB.props(Props(classOf[TestRefWrappingActor], actorC)), "step1-case2-primary")

    actorA.send(actorB, MessageA2B())

    actorA.expectNoMessage(100.milliseconds)

    actorC.expectMsg(MessageB2C())

    // Retries form above
    actorC.expectMsg(200.milliseconds, MessageB2C())

    // Never reach this point with 100 ms frequency
    actorC.expectMsg(200.milliseconds, MessageB2C())

    actorA.expectNoMessage(100.milliseconds)

    actorC.reply(MessageB2C_Ack())

    // Never reach this point with MessageB2C 50 ms frequency
    actorA.expectMsg(MessageA2B_Ack())
  }

这是ActorB代码:

class ActorB(actorCProps: Props) extends Actor {
  import ActorB._
  import context.dispatcher

  val log = Logging(context.system, this)

  val actorC = context.actorOf(actorCProps)

  def retry(actorRef: ActorRef, message: Any, maxAttempts: Int, attempt: Int): Future[Any] = {
    log.info("ActorB - sent message MessageB2C to ActorC " + actorC)
    val future = Patterns.ask(actorRef, message, 50.millisecond) recover {
      case e: AskTimeoutException =>
        if (attempt <= maxAttempts) retry(actorRef, message, maxAttempts, attempt + 1)
        else None
    }
    future
  }

  def receive = {
    case r:MessageA2B => {
      val client = context.sender()
      implicit val timeout = Timeout(100.milliseconds)
      implicit val scheduler=context.system.scheduler
      val p = MessageB2C()

      retry(actorC, p, 10) onSuccess({
        case p: MessageB2C_Ack => {
          client ! MessageA2B_Ack()
        }
      })
    }
  }
}

更新

奇怪的是,测试成功完成评论该行actorA.expectNoMessage

4

2 回答 2

1

像这样的测试可能很不稳定,因为它们取决于执行的确切时间,但实际执行可能会受到各种因素的影响,包括系统负载(可能还有宇宙射线)。也就是说,通过对 ActorB 的一些修改,我能够让你的测试通过......

 RetrySupport
    .retry(() => {
      log.info("ActorB - sent message MessageB2C to ActorC " + actorC)
      Patterns.ask(actorC, p, 100.millisecond)
    }, 10, 0.millisecond)
    .onSuccess({
      case p: MessageB2C_Ack => {
        log.info(
          "ActorB - Received MessageB2C_Ack so now sending an MessageA2B_Ack to client " + client
        )
        client ! MessageA2B_Ack()
      }
    })

调试问题的关键是查看日志“ActorB - 向 ActorC Actor 发送消息 MessageB2C”的时间安排。我看到的时间约为 250 毫秒。问题是您等待的时间长度ask被添加delayretry. 为了在消息之间等待 100 毫秒,您应该将延迟时间设置为 0,并使用请求超时来管理重试调度。

于 2019-09-15T23:15:55.047 回答
0

环顾四周寻找答案,终于找到了一些东西,好吧有人。

长话短说,这是我在 Akka 论坛上的回复:

使用 可能不明显的一点Patterns.ask是,它会生成一个临时演员,该演员只存在于接收回复并完成Futureask. 该临时演员被用作MessageB2C发送给演员 C 的消息的发送者。这意味着当 C 回复时,它不是给 B,而是给由 . 创建的临时演员ask。在每次重试中,这将是一个新的演员,当 ask超时时,临时演员将被停止。

然后,当您使用 时expectNoMessage,测试必须等待您传递给它的整个持续时间(100 毫秒)才能继续。这意味着当你调用actorC.reply(MessageB2C_Ack())时,前一条消息的临时发送者将超时并停止,并且actor C 的邮箱中应该有一条新的重试消息。这就是为什么回复是死信的原因。我相信您将能够通过 actorC.expectMsg(200.milliseconds, MessageB2C())actorA.expectNoMessage(100.milliseconds)and 之间添加另一个来解决这个问题actorC.reply(MessageB2C_Ack())

一般来说,ask通常不鼓励使用像这样的来自 Actor 内部的方法,而是直接在发送 Actor 中直接对请求和响应流进行建模。这通常更简单,更不“神奇”,像这样的惊喜更少。这篇文章很好地解释了一些权衡 https://medium.com/@yuriigorbylov/akka-ask-antipattern-8361e9698b20

我发现这个答案确实很有趣,我决定切换我的解决方案,采用错误内核模式。

于 2019-09-24T09:20:23.093 回答