1

我正在测试一个使用异步基于未来的 API 的演员。当 future 完成时,actor 使用管道模式向自己发送消息:

import akka.pattern.pipe
// ...

// somewhere in the actor's receive method
futureBasedApi.doSomething().pipeTo(self)

在我的测试中,我模拟了 API,所以我通过 Promise 控制未来的完成。但是,这与直接发送给参与者的其他消息交错:

myActor ! Message("A")
promiseFromApiCall.success(Message("B"))
myActor ! Message("C")

现在我想知道在我的测试中如何保证actor在消息A和C之间接收和处理消息B,因为消息B实际上是在另一个线程中发送的,所以我无法控制actor邮箱接收的顺序消息。

我想了几个可能的解决方案:

  • 在每条消息后休眠几毫秒,以使另一个订单不太可能

  • 等待参与者确认每条消息,尽管确认仅在测试时需要

  • 将消息 B 直接发送给 actor 以模拟未来的完成,并编写一个单独的测试来确保正确使用管道模式(如果 actor 不将结果消息通过管道传递给自己,上面的测试不会失败)

我不太喜欢这两个选项中的任何一个,但我倾向于使用最后一个。还有另一种更好的方法可以在测试中强制执行特定的消息顺序吗?

澄清:问题不在于如何处理消息可能在生产中以随机顺序接收的事实。控制测试中的顺序对于确保参与者可以实际处理不同的消息顺序至关重要。

4

3 回答 3

1

一个想法是在你的actor中定义一个标志来指示actor是否接收到消息B。当actor接收到消息C时,如果标志为假,则actor可以隐藏该消息C,然后在actor接收到消息B时取消隐藏它例如:

class MyActor extends Actor with Stash {

  def receiveBlock(seenMsgB: Boolean, seenMsgC: Boolean): Receive = {
    case MakeApiCall =>
      callExternalApi().mapTo[MessageB].pipeTo(self)

    case m: MessageB if seenMsgC => // assume msg C has been stashed
      unstashAll()
      // ...do something with msg B
      become(receiveBlock(true, seenMsgC)) // true, true
    case m: MessageB if !seenMsgC =>
      // ...do something with message B
      become(receiveBlock(true, seenMsgC)) // true, false

    case m: MessageC if seenMsgB =>
      // ...do something with message C
      context.become(receiveBlock(seenMsgB, true)) // true, true
    case m: MessageC if !seenMsgB =>
      stash()
      context.become(receiveBlock(seenMsgB, true)) // false, true

    case ...
  }

  def receive = receiveBlock(false, false)
}
于 2018-07-09T13:20:11.200 回答
1

在阅读了更多关于 akka 的内容后,我终于找到了一个更好的解决方案:将 actor 邮箱替换为我在测试中可以观察到的邮箱。这样我就可以等到演员在我完成承诺后收到一条新消息。只有这样才能发送下一条消息。这篇TestingMailbox文章的最后给出了代码。

更新:在 Akka Typed 中,这可以通过BehaviorInterceptor. 只需使用自定义拦截器包装Behavior被测对象,该拦截器转发所有消息和信号,但让您观察它们。下面给出了无类型 Akka 的邮箱解决方案。


演员可以这样配置:

actorUnderTest = system.actorOf(Props[MyActor]).withMailbox("testing-mailbox"))

我必须通过提供配置来确保参与者系统知道“测试邮箱”:

class MyTest extends TestKit(ActorSystem("some name",
    ConfigFactory.parseString("""{ 
        testing-mailbox = {
            mailbox-type = "my.package.TestingMailbox" 
        }
    }"""))) 
    with BeforeAndAfterAll // ... and so on

设置好之后,我可以像这样更改我的测试:

myActor ! Message("A")
val nextMessage = TestingMailbox.nextMessage(actorUnderTest)
promiseFromApiCall.success(Message("B"))
Await.ready(nextMessage, 3.seconds)
myActor ! Message("C")

使用一点辅助方法,我什至可以这样写:

myActor ! Message("A")
receiveMessageAfter { promiseFromApiCall.success(Message("B")) }
myActor ! Message("C")

这是我的自定义邮箱:

import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch._
import com.typesafe.config.Config 
import scala.concurrent.{Future, Promise}

object TestingMailbox {

  val promisesByReceiver =
    scala.collection.concurrent.TrieMap[ActorRef, Promise[Any]]()

  class MessageQueue extends UnboundedMailbox.MessageQueue {

    override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
      super.enqueue(receiver, handle)
      promisesByReceiver.remove(receiver).foreach(_.success(handle.message))
    }

  }

  def nextMessage(receiver: ActorRef): Future[Any] =
    promisesByReceiver.getOrElseUpdate(receiver, Promise[Any]).future

}

class TestingMailbox extends MailboxType
  with ProducesMessageQueue[TestingMailbox.MessageQueue] {

  import TestingMailbox._

  def this(settings: ActorSystem.Settings, config: Config) = this()

  final override def create(owner: Option[ActorRef],
                            system: Option[ActorSystem]) =
      new MessageQueue()

}
于 2018-07-18T12:18:35.923 回答
0

如果订购消息非常重要,那么您应该使用ask( ?) 来返回Future并链接它们,即使您不期望来自参与者的任何响应。

于 2018-07-09T16:18:47.693 回答