4

我正在尝试使用 Futures 和 Akka 主管开发应用程序,但是当 A 未来向参与者返回失败时,其主管没有收到异常。

这是我的代码。

1) 监督演员

class TransaccionActorSupervisor() extends Actor with ActorLogging {

  val actor: ActorRef = context.actorOf(Props[TransaccionActor].withRouter(RoundRobinPool(nrOfInstances = 5)), "transaccion-actor")

  def receive = {
    case msg: Any => actor forward msg
  }

  override val supervisorStrategy = OneForOneStrategy() {
    case exception =>
      println("<<<<<<<<<<<<<<<<<<< IN SUPERVISOR >>>>>>>>>>>>>>>>>>>>>>>>>>>>")
      Restart
  }

}

监制演员

Class TransaccionActor() extends Actor with ActorLogging {

  implicit val _: ExecutionContext = context.dispatcher
  val transaccionAdapter = (new TransaccionComponentImpl with TransaccionRepositoryComponentImpl).adapter

  def receive = {

    case msg: GetTransaccionById =>
      val currentSender: ActorRef = sender()
      transaccionAdapter.searchTransaction(msg.id).onComplete {
         case Success(transaction) => currentSender ! transaction
         case Failure(error) => throw error
      }

  }

我究竟做错了什么?

非常感谢大家!

4

2 回答 2

4

我有同样的问题,瑞安的回答确实有帮助。但是因为我是 Akka 的新手,所以理解答案并非易事,所以我想提供一些细节。

首先,我认为这onComplete根本行不通。它只是注册可以在完全独立的线程中调用的回调函数,并且不返回新的Future. 因此,任何抛出的异常都onComplete将丢失。

相反,最好使用map, recover,recoverWithtransform返回 new Futures。然后你需要将结果传递给演员,例如self;接收参与者必须处理管道结果并重新抛出异常。

换句话说,你的监督演员应该是这样的:

import akka.pattern.pipe
import akka.actor.Status.Failure
import akka.actor.Actor

class TransaccionActor() extends Actor with ActorLogging {

  import context.dispatcher

  val transaccionAdapter = 
    (new TransaccionComponentImpl with TransaccionRepositoryComponentImpl).adapter

  def receive = {

    case msg: GetTransaccionById =>
      val currentSender: ActorRef = sender()
      transaccionAdapter searchTransaction msg.id map { transaction =>
         currentSender ! transaction
      } pipeTo self

    case Failure(throwable) => throw throwable

  }

}
于 2015-03-12T22:08:12.463 回答
0

演员内部未来抛出的异常不会被演员捕获。如果您希望监督演员处理它,您需要将异常传递给self然后重新抛出。

于 2014-12-18T03:33:21.860 回答