我将尝试为您回答其中的一些问题。我不会对所有事情都有具体的答案,但希望我能引导你朝着正确的方向前进。
对于初学者,您需要更改将请求传达给进行图书搜索的 3 个参与者的方式。在这里使用 aScatterGatherFirstCompletedRouter
可能不是正确的方法。该路由器只会等待其中一个路由(第一个响应)的回答,因此您的结果集将不完整,因为它不包含来自其他 2 个路由的结果。还有一个BroadcastRouter
, 但这也不适合您的需求,因为它只处理tell (!)
而不是ask (?)
. 要执行您想做的事情,一种选择是将请求发送给每个接收者,获取Futures
响应,然后Future
使用Future.sequence
. 一个简化的示例可能如下所示:
case class SearchBooks(title:String)
case class Book(id:Long, title:String)
class BookSearcher extends Actor{
def receive = {
case req:SearchBooks =>
val routees:List[ActorRef] = ...//Lookup routees here
implicit val timeout = Timeout(10 seconds)
implicit val ec = context.system.dispatcher
val futures = routees.map(routee => (routee ? req).mapTo[List[Book]])
val fut = Future.sequence(futures)
val caller = sender //Important to not close over sender
fut onComplete{
case Success(books) => caller ! books.flatten
case Failure(ex) => caller ! Status.Failure(ex)
}
}
}
现在这不会是我们的最终代码,但它是您的示例尝试执行的近似值。在这个例子中,如果任何一个下游路由失败/超时,我们将命中我们的Failure
块,调用者也将失败。如果它们都成功,调用者将获得Book
对象的聚合列表。
现在回答你的问题。首先,如果您在超时时间内没有从其中一个路由中得到答案,您会询问是否应该再次向所有参与者发送请求。这个问题的答案真的取决于你。您会允许另一端的用户看到部分结果(即 3 个参与者中的 2 个的结果),还是每次都必须是完整的结果集?如果答案是肯定的,您可以将发送到 routees 的代码调整为如下所示:
val futures = routees.map(routee => (routee ? req).mapTo[List[Book]].recover{
case ex =>
//probably log something here
List()
})
使用此代码,如果任何路由因任何原因超时或失败,则会用一个空的“Book”列表代替响应而不是失败。现在,如果您不能忍受部分结果,那么您可以重新发送整个请求,但您必须记住,另一端可能有人在等待他们的图书结果,他们不想永远等待。
对于第二个问题,您问如果您的超时时间过早怎么办?您选择的超时值将完全取决于您,但它很可能应该基于两个因素。第一个因素将来自测试搜索的调用时间。找出平均需要多长时间,并在此基础上选择一个值,并稍加缓冲以确保安全。第二个因素是另一端的人愿意等待他们的结果多长时间。您可以在超时时非常保守,为了安全起见将其设置为 60 秒,但如果另一端确实有人在等待结果,他们愿意等待多长时间?我宁愿得到一个失败响应,表明我应该再试一次,而不是永远等待。所以考虑到这两个因素,
对于问题 3,您询问如果消息被丢弃会发生什么。在这种情况下,我猜测接收该消息的人的未来将只是超时,因为它不会得到响应,因为接收者参与者永远不会收到要响应的消息。Akka 不是 JMS;它没有确认模式,如果收件人没有收到并确认消息,则可以多次重发消息。
此外,正如您从我的示例中看到的那样,我同意不Future
使用Await
. 我更喜欢使用非阻塞回调。在接收函数中阻塞并不理想,因为该Actor
实例将停止处理其邮箱,直到阻塞操作完成。通过使用非阻塞回调,您可以释放该实例以返回处理其邮箱,并允许处理结果只是在 中执行的另一个作业ExecutionContext
,与处理其邮箱的参与者分离。
现在,如果您真的不想在网络不可靠时浪费通信,您可以查看Akka 2.2 中可用的Reliable Proxy 。如果你不想走这条路线,你可以通过ping
定期向路由发送类型消息来自己滚动它。如果没有及时响应,则将其标记为已关闭,并且在您获得可靠(在很短的时间内)之前不要向其发送消息ping
从中,有点像每个路由的 FSM。如果您绝对需要此行为,则其中任何一个都可以工作,但您需要记住,这些解决方案会增加复杂性,并且只有在您绝对需要此行为时才应使用。如果您正在开发银行软件并且您绝对需要有保证的交付语义,否则会导致不良的财务影响,请务必采用这种方法。在决定是否需要这样的东西时要明智,因为我打赌 90% 的时间你不需要。在您的模型中,唯一可能因等待您可能已经知道不会成功的事情而受到影响的人是另一端的呼叫者。通过在actor中使用非阻塞回调,它不会因为某些事情可能需要很长时间而停止;它' s 已移至下一条消息。如果您决定在失败时重新提交,您也需要小心。您不想淹没接收演员的邮箱。如果您决定重新发送,请将其设置为固定的次数。
如果您需要这些有保证的语义,另一种可能的方法可能是查看 Akka 的聚类模型。如果您对下游路由进行了集群化,并且其中一台服务器出现故障,那么所有流量都将被路由到仍在运行的节点,直到其他节点恢复为止。