4

我是 akka 的新手,我想一次运行一个列表 akka 演员来获取消息列表,例如(我在这里使用 scala):

输入

message_1
message_2
message_3

每条消息的输出:

1
2
3

将一次运行 3 个 akka 演员,将列表中的每个参数传递给他们每个人,我想稍后将这些结果用作List答案。理想情况下,我正在寻找可以为我提供以下服务的操作:

runActorsForListAndWaitForAnswer(messagesList).map(_.toInt()).sum

我不知道这种方法是否存在,但它会有所帮助。任何帮助表示赞赏,谢谢!

4

1 回答 1

13

首先,你必须放弃命令式、阻塞式的思维定势。当一切都是异步的时,Akka 的味道最好。你永远不应该阻止,等待另一个演员的回复。相反,您可以获取Future[T]对象并对其应用功能。future 完成时,将调用此函数。

让我们举个例子:你有一个SquareActor接受 anInt并返回它的平方的 a。要求它回复的正确方法是:

squareActor ? 9 map {result =>
  println(result)  //81
}
//more code

非常重要:代码块println不会阻塞。更多代码将立即执行,您的回调方法将在稍后调用。

话虽如此,我们可以开始实施您的用例。如果我理解正确,您有一个演员列表和一个整数列表。您想将每个整数都发送给一个演员。这是一个伪代码:

val actors: List[ActorRef] = //...
val numbers: List[Int] = //...
val actorsWithArguments: List[(ActorRef, Int)] = actors zip numbers
val futuresOfAny: List[Future[Any]] = actorsWithArguments map { case (actor, number) => actor ? number}
val futures: List[Future[Int]] = futuresOfAny map {_.mapTo[Int]}
val futureOfAllResults: Future[List[Int]] = Future.sequence(futures)
val future: Future[Int] = futureOfAllResults map { _.sum}

我故意留下显式类型以帮助您遵循代码。让我们一步一步来:

  • actorsWithArguments是一个List元组。每个项目都包含一对演员和一条消息 - (actor1, message1), (actor2, message2), ...

  • futuresOfAny包含调用Future结果的列表。返回 a因为:a)结果(尚)未知 b)Akka 不知道结果的类型(回复消息)actor ? number?Future[Any]

  • futures是强类型的,因为我们知道每个回复都是一个Int. 我们mapTo每个Future[Any]Future[Int]

  • futureOfAllResultsList[Future[Int]]采用了从 a到的非常棒的转换Future[List[Int]]。换句话说,我们只是将未来结果列表转换为包含所有项目的单个未来结果。

  • future持有(将来会持有)你的结果。

如果您认为这是很多代码,那只是出于教育目的。链接和类型推断可以发挥作用:

val future = Future.sequence(actors zip numbers map { case (actor, number) => actor ? number} map { _.mapTo[Int]}) map { _.sum} }

最后你得到了你的结果。好吧,你将来会得到。现在,如果您想将该结果发送回另一个演员,您可以说:

future map {sum => otherActor ! sum}
//or even...
future map otherActor.!

强烈建议您阅读官方文档中有关 Futures的章节,以使这一切更清楚。

于 2013-01-13T20:06:43.740 回答