25

我浏览了一些关于 akka 如何以及为什么不保证消息传递的帖子。文档、这个讨论其他关于小组的讨论确实很好地解释了这一点。

我对 akka 很陌生,想知道一个合适的设计。例如,假设我有 3 个不同的演员都在不同的机器上。一个负责烹饪书,另一个负责历史,最后一个负责技术书籍。

我在另一台机器上有一个主要演员。假设如果我们有一些可用的书,那么有一个要搜索的主要演员的查询。主要参与者向 3 个远程参与者发送请求,并期待结果。所以我这样做:

  val scatter = system.actorOf(
        Props[SearchActor].withRouter(ScatterGatherFirstCompletedRouter(
              routees=someRoutees, within = 10 seconds)), "router")
  implicit val timeout = Timeout(10 seconds)
  val futureResult = scatter ?  Text("Concurrency in Practice")
  //      What should I do here?.
  //val result = Await.result(futureResult, timeout.duration) line(a)

简而言之,我已经向所有 3 个远程参与者发送了请求,并期望在 10 秒内得到结果。

应该采取什么行动?

  1. 假设我在 10 秒内没有得到结果,我应该再次向所有人发送新请求吗?
  2. 如果within上面的时间还为时过早怎么办。但我不知道可能需要多长时间。
  3. 如果within时间足够但消息被丢弃怎么办。

如果我没有及时得到响应within并重新发送请求。像这样,它保持异步:

futureResult onComplete{
  case Success(i) => println("Result "+i)
  case Failure(e) => //send again
}

但是在太多的查询下,会不会调用线程太多,体积庞大?如果我取消注释line(a),它会变得同步并且在负载下可能会表现不佳。

说我在 10 秒内没有得到回应。如果within时间还为时过早,那么它会再次发生大量无用的计算。如果消息被丢弃,那么10浪费了几秒钟的宝贵时间。以防万一,假设我知道消息已送达,我可能会等待更长的时间而不持怀疑态度。

人们如何解决这些问题?ACK? 但是我必须将状态存储在所有查询的参与者中。这一定是一件常见的事情,我正在寻找合适的设计。

4

1 回答 1

25

我将尝试为您回答其中的一些问题。我不会对所有事情都有具体的答案,但希望我能引导你朝着正确的方向前进。

对于初学者,您需要更改将请求传达给进行图书搜索的 3 个参与者的方式。在这里使用 aScatterGatherFirstCompletedRouter可能不是正确的方法。该路由器只会等待其中一个路由(第一个响应)的回答,因此您的结果集将不完整,因为它不包含来自其他 2 个路由的结果。还有一个BroadcastRouter, 但这也不适合您的需求,因为它只处理tell (!)而不是ask (?). 要执行您想做的事情,一种选择是将请求发送给每个接收者,获取Futures响应,然后Future使用Future.sequence. 一个简化的示例可能如下所示:

case class SearchBooks(title:String)
case class Book(id:Long, title:String)

class BookSearcher extends Actor{

  def receive = {
    case req:SearchBooks =>
      val routees:List[ActorRef] = ...//Lookup routees here
      implicit val timeout = Timeout(10 seconds)
      implicit val ec = context.system.dispatcher

      val futures = routees.map(routee => (routee ? req).mapTo[List[Book]])
      val fut = Future.sequence(futures)

      val caller = sender //Important to not close over sender
      fut onComplete{
        case Success(books) => caller ! books.flatten

        case Failure(ex) => caller ! Status.Failure(ex)
      }
  }
}

现在这不会是我们的最终代码,但它是您的示例尝试执行的近似值。在这个例子中,如果任何一个下游路由失败/超时,我们将命中我们的Failure块,调用者也将失败。如果它们都成功,调用者将获得Book对象的聚合列表。

现在回答你的问题。首先,如果您在超时时间内没有从其中一个路由中得到答案,您会询问是否应该再次向所有参与者发送请求。这个问题的答案真的取决于你。您会允许另一端的用户看到部分结果(即 3 个参与者中的 2 个的结果),还是每次都必须是完整的结果集?如果答案是肯定的,您可以将发送到 routees 的代码调整为如下所示:

val futures = routees.map(routee => (routee ? req).mapTo[List[Book]].recover{
  case ex =>
    //probably log something here
    List()
})

使用此代码,如果任何路由因任何原因超时或失败,则会用一个空的“Book”列表代替响应而不是失败。现在,如果您不能忍受部分结果,那么您可以重新发送整个请求,但您必须记住,另一端可能有人在等待他们的图书结果,他们不想永远等待。

对于第二个问题,您问如果您的超时时间过早怎么办?您选择的超时值将完全取决于您,但它很可能应该基于两个因素。第一个因素将来自测试搜索的调用时间。找出平均需要多长时间,并在此基础上选择一个值,并稍加缓冲以确保安全。第二个因素是另一端的人愿意等待他们的结果多长时间。您可以在超时时非常保守,为了安全起见将其设置为 60 秒,但如果另一端确实有人在等待结果,他们愿意等待多长时间?我宁愿得到一个失败响应,表明我应该再试一次,而不是永远等待。所以考虑到这两个因素,

对于问题 3,您询问如果消息被丢弃会发生什么。在这种情况下,我猜测接收该消息的人的未来将只是超时,因为它不会得到响应,因为接收者参与者永远不会收到要响应的消息。Akka 不是 JMS;它没有确认模式,如果收件人没有收到并确认消息,则可以多次重发消息。

此外,正如您从我的示例中看到的那样,我同意不Future使用Await. 我更喜欢使用非阻塞回调。在接收函数中阻塞并不理想,因为该Actor实例将停止处理其邮箱,直到阻塞操作完成。通过使用非阻塞回调,您可以释放该实例以返回处理其邮箱,并允许处理结果只是在 中执行的另一个作业ExecutionContext,与处理其邮箱的参与者分离。

现在,如果您真的不想在网络不可靠时浪费通信,您可以查看Akka 2.2 中可用的Reliable Proxy 。如果你不想走这条路线,你可以通过ping定期向路由发送类型消息来自己滚动它。如果没有及时响应,则将其标记为已关闭,并且在您获得可靠(在很短的时间内)之前不要向其发送消息ping从中,有点像每个路由的 FSM。如果您绝对需要此行为,则其中任何一个都可以工作,但您需要记住,这些解决方案会增加复杂性,并且只有在您绝对需要此行为时才应使用。如果您正在开发银行软件并且您绝对需要有保证的交付语义,否则会导致不良的财务影响,请务必采用这种方法。在决定是否需要这样的东西时要明智,因为我打赌 90% 的时间你不需要。在您的模型中,唯一可能因等待您可能已经知道不会成功的事情而受到影响的人是另一端的呼叫者。通过在actor中使用非阻塞回调,它不会因为某些事情可能需要很长时间而停止;它' s 已移至下一条消息。如果您决定在失败时重新提交,您也需要小心。您不想淹没接收演员的邮箱。如果您决定重新发送,请将其设置为固定的次数。

如果您需要这些有保证的语义,另一种可能的方法可能是查看 Akka 的聚类模型。如果您对下游路由进行了集群化,并且其中一台服务器出现故障,那么所有流量都将被路由到仍在运行的节点,直到其他节点恢复为止。

于 2013-05-29T12:25:03.067 回答