12

我想从我自己的线程中与 Akka 演员互动。目前,我喜欢这样:

val res = Await.result(aref ? GroupReceive(fromRank), timeout.duration).asInstanceOf[T]

但我不确定这实际上是如何与我的线程交互的?我希望接收是异步的,即我想在接收时挂断线程以允许完成一些其他工作。我最近刚读到 Akka 收件箱系统。收件箱 akka api

我想我记得 Await 每次都会创建一个新演员。await+ask 和收件箱有什么区别,有人可以给我一个示例,说明如何创建收件箱并使用它与“外部”的演员交流吗?

编辑 澄清一下,我不希望同一个线程继续工作,我希望它停止占用 cpu 核心并让其他线程工作,直到它收到一些东西,然后再次醒来。

4

3 回答 3

9

正如 Akka 的 Future 文档中所写,使用 Await 会阻塞当前线程,直到等待结果。

例子

import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

implicit val timeout = Timeout(5 seconds)
val future = actor ? msg // enabled by the “ask” import
val result = Await.result(future, timeout.duration).asInstanceOf[String]

这将导致当前线程阻塞并等待 Actor用它的回复“完成”Future。

与演员一起使用

于 2013-05-24T09:17:55.003 回答
5

Await.receive是 Scala 并发 API 的一部分,与参与者无关。它的目的是阻塞当前线程,直到提供的未来完成,或者超时限制开始,整个事情以超时异常结束。

ask 操作符?确实会创建一个临时参与者,其唯一目的是等待变量指向的参与者的回复,aref并完成您在使用收到的回复调用 ask 操作员时获得的未来。

所以你的代码基本上阻塞了整个线程。如前所述,如果您想释放当前线程并继续做一些其他工作,您可以将回调附加到未来。

implicit val ctx: ExecutionContext = //provide execution context here
implicit val timeout: Timeout = // provide timeout here
aref ? GroupReceive(fromRank)) onSuccess { res =>
   //do something with res here, asynchronously
}
// some other code which runs without being blocked...

上面的代码可以用你上面提到的actor DSL重写:

import akka.actor.ActorDSL._
implicit val actorSystem: ActorSystem = // provide an actor system here or any actor ref factory

actor(new Act {
  aref ! GroupReceive(fromRank)
  context.setReceiveTimeout(timeout) //optional
  become {
    case ReceiveTimeout => {
      //handle the timeout
      context.stop(self)
    }
    case res => {
      //do your thing with res, asynchronously
      context.stop(self)
    }
  }
}

//some other code which won't wait for the above operations

后一个版本还创建了一个新的临时actor,它发送GroupReceive消息然后等待回复,之后它会杀死自己。

底线是,为了接收来自演员的消息,您自己必须是演员。Actor 不能只向ActorRef.

所以要么你使用 ask 模式在幕后创建一个临时演员并管理这个临时演员的生命周期本身,向你展示一个很好的简单未来可以使用,或者你可以自己创建临时演员,但你必须管理它的生命周期(即记住一旦它完成它的工作就杀死它)

选择最适合您的选项。

于 2013-05-24T11:09:16.850 回答
0

如果您不想在调用方阻塞,则不要使用 Await,而是使用非阻塞回调,例如 onSuccess、onFailure 和 onComplete。当您这样做时,未来的任务将被放入询问(?)时范围内的任何 ExecutionContext 中。收到响应后,将通过 ExecutionContext 异步调用此回调。这样可以避免在向参与者发出请求的线程中一起阻塞,然后在与 ExecutionContext 相关联的线程池中处理回调。

此外,我相信您提到的收件箱内容旨在测试 REPL 中的演员内容(至少 ActorDsl 上的文档是这样说的)。坚持使用从演员外部询问的方法。让 akka 为非参与者到参与者的调用创建其底层通信所需的短期参与者。然后按照我上面的建议切换到非阻塞回调。我相信这就是你要找的。

于 2013-05-24T10:58:38.143 回答