我使用 Akka 已经有一段时间了,而且在更简单的项目中,只使用 Scala 2.10 期货。然而,前几天我不得不将返回 Futures 的库与 Akka Actors 混合在一起,我不确定如何集成 Actor 调度系统和失控的 Futures。
在项目中使用 Akka Actor 的选择是能够微调 Actor 队列并控制代码的并行化,并且可能最终横向扩展(即使现在它不是优先级)。但是,如果我有这样的代码(故意简化):
package externallib
object DoSomething {
def foo(): Future[Something] = ....
}
package myapp
class MyActor extends Actor {
def receive = {
case "message" => externallib.DoSomething.foo() pipeTo sender
}
}
.../
val actorRef = system.actorOf(Props[MyActor].withRouter(
RoundRobinRouter(nrOfInstances = 5)))
(0 to 20000) foreach {
val futureSomething = actorRef ? "message"
}
那么 MyActor 实际上并没有做任何事情,除了在 externallib 中生成一个线程并直接返回。externallib 未来最终会将结果传递给参与者的调用者。
然而,通过这种方式,管理 Actor 的受良好控制的路由器并没有真正控制生成的线程,因为它们是在 Actor 系统之外生成的,即使在同一个 ExecutionContext 中也是如此。在大循环的示例中,这意味着不是将消息排队发送给参与者,而是这些消息将被快速消耗并在任何严格控制的队列之外产生 20000 个线程。
我在想我可以做这样的事情:
class MyActor extends Actor {
def receive = {
case "message" =>
val res = Await.result(externallib.DoSomething.foo(), someDuration)
sender ! res
}
}
这将确保"message"
在 externallib 完成(或超时)之前不会向 actor 发送新的。然而,这实际上可能需要两个线程(参与者的一个和 DoSomething 中的一个)来等待单个计算。
有没有更好的方法来控制这些在参与者系统之外产生的期货?