3

10 月 16 日星期三更新)今天有一个 PullRequest,它提供了有关超时的目标信息。 https://github.com/akka/akka/pull/1780


Akka 的超时异常非常无用。

有什么方法可以获得关于超时发生在哪里/发生了什么的有用消息?

像这样的例外没有帮助

java.util.concurrent.TimeoutException: Futures timed out after [5000] milliseconds
    at akka.dispatch.DefaultPromise.ready(Future.scala:834)
    at akka.dispatch.DefaultPromise.ready(Future.scala:811)
    at akka.dispatch.Await$.ready(Future.scala:64)
    at nl.cwi.crisp.examples.p2p.scala.Network.<init>(Node.scala:136)
    at nl.cwi.crisp.examples.p2p.scala.Main$$anonfun$11.apply(Node.scala:164)
    at nl.cwi.crisp.examples.p2p.scala.Main$$anonfun$11.apply(Node.scala:164)
    at akka.actor.ActorCell.newActor(ActorCell.scala:488)
    at akka.actor.ActorCell.create$1(ActorCell.scala:506)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:591)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:191)
    at akka.dispatch.Mailbox.run(Mailbox.scala:160)
    at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
    at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
    at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:997)
    at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1495)
    at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
4

1 回答 1

3

使用目前的akka​​代码,它不会发生。让我们先来看看为什么。如果您查看PromiseActorRef对象,您会看到:

def apply(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = {
  val result = Promise[Any]()
  val scheduler = provider.guardian.underlying.system.scheduler
  val a = new PromiseActorRef(provider, result)
  implicit val ec = a.internalCallingThreadExecutionContext
  val f = scheduler.scheduleOnce(timeout.duration) { result tryComplete Failure(new AskTimeoutException("Timed out")) }
  result.future onComplete { _ ‚áí try a.stop() finally f.cancel() }
  a
}

这是安排并行(与实际参与者调用平行)超时的地方。这个类没有关于它发送什么消息或它发送到哪个actor ref 的上下文。这可能就是为什么它只是说“超时”这不是很有帮助。我有点希望类型安全人员对此进行一些调整以提供更多信息,但如果他们不这样做,或者如果您在此期间想要一些东西,您可以尝试这样的事情:

object NewAskPattern{
  implicit def ask(ref:ActorRef) = new BetterTimeoutMessageSupportAskableRef(ref)
}

class BetterTimeoutMessageSupportAskableRef(ref: ActorRef) {
  import akka.pattern.AskableActorRef
  val askRef = new AskableActorRef(ref)

  def ask(message: Any)(implicit timeout: Timeout, ec:ExecutionContext): Future[Any] = 
    (askRef ? message) recover{
      case to:TimeoutException => 
        val recip = askRef.actorRef.path
        val dur = timeout.duration
        throw new TimeoutException(s"Timed out sending message $message to recipient $recip using timeout of $dur") 
    }

  def ?(message: Any)(implicit timeout: Timeout, ec:ExecutionContext): Future[Any] = 
    ask(message)(timeout, ec)
} 

class MySlowActor extends Actor{
  def receive = {
    case any => 
      Thread.sleep(5000)
      sender ! "bar"
  }
}

object NewMessageTest{
  import NewAskPattern.ask

  def main(args: Array[String]) {
    implicit val timeout = Timeout(2 seconds)
    val sys = ActorSystem()
    import sys.dispatcher

    val slow = sys.actorOf(Props[MySlowActor])
    val fut = slow ? "foo"
    fut onComplete (println(_))
  }
}

这里的一般想法是AskableActorRef从 Akka 库中包装并稍微增强它。我正在接受Future返回的ask内容并向其添加一个recover组合器,允许我在超时时调整消息。由于此类具有正在发送的消息以及发送给谁的上下文,因此它可以制定更有用的消息。然后NewAskPattern对象包含新的隐式给你BetterTimeoutMessageSupportAskableRef,让你获得这种增强的行为。这是一个完美的解决方案吗?可能不是,但如果你真的想要这种行为,这对你来说可能是一个很好的起点。

于 2013-08-16T12:13:53.173 回答