0

我使用 Akka 已经有一段时间了,而且在更简单的项目中,只使用 Scala 2.10 期货。然而,前几天我不得不将返回 Futures 的库与 Akka Actors 混合在一起,我不确定如何集成 Actor 调度系统和失控的 Futures。

在项目中使用 Akka Actor 的选择是能够微调 Actor 队列并控制代码的并行化,并且可能最终横向扩展(即使现在它不是优先级)。但是,如果我有这样的代码(故意简化):

package externallib

object DoSomething {
    def foo(): Future[Something] = ....
}


package myapp

class MyActor extends Actor {
    def receive = {
        case "message" => externallib.DoSomething.foo() pipeTo sender
    }
}

.../
val actorRef = system.actorOf(Props[MyActor].withRouter(
                   RoundRobinRouter(nrOfInstances = 5)))
(0 to 20000) foreach {

    val futureSomething = actorRef ? "message"

}

那么 MyActor 实际上并没有做任何事情,除了在 externallib 中生成一个线程并直接返回。externallib 未来最终会将结果传递给参与者的调用者。

然而,通过这种方式,管理 Actor 的受良好控制的路由器并没有真正控制生成的线程,因为它们是在 Actor 系统之外生成的,即使在同一个 ExecutionContext 中也是如此。在大循环的示例中,这意味着不是将消息排队发送给参与者,而是这些消息将被快速消耗并在任何严格控制的队列之外产生 20000 个线程。

我在想我可以做这样的事情:

class MyActor extends Actor {
    def receive = {
        case "message" => 
            val res = Await.result(externallib.DoSomething.foo(), someDuration)
            sender ! res
    }
}

这将确保"message"在 externallib 完成(或超时)之前不会向 actor 发送新的。然而,这实际上可能需要两个线程(参与者的一个和 DoSomething 中的一个)来等待单个计算。

有没有更好的方法来控制这些在参与者系统之外产生的期货?

4

1 回答 1

0

MyActor 看起来很危险,在 Actor 内部阻塞并不是一个好主意,事实上,如果 ExecutionContext 中只有一个线程供 Actor 和未来使用,这将死锁。写这个的更好的方法是onComplete在 future 上放置一个,并让 actorbecome对一个行为执行一个使其不发送任何新的 futures 的行为(可能通过stashing 所有传入的消息)。onComplete未来既可以向发送者发送消息,也可以向参与者发送消息以告诉它原始become行为。

于 2013-09-03T01:01:33.673 回答