28

当您需要执行 I/O(即数据库操作)时,actor 模型(在 Akka 中)如何工作?

据我了解,阻塞操作会引发异常(并且由于 Akka 使用的 Netty 的事件性质,基本上会破坏所有并发性)。因此,我将不得不使用 aFuture或类似的东西 - 但是我不了解并发模型。

  1. 1个演员可以同时处理多条消息吗?
  2. 如果一个actor在a future(ie. future.get()) 中进行了阻塞调用,那么它只会阻塞当前actor的执行;还是会阻止对所有参与者的执行,直到阻塞调用完成?
  3. 如果它阻塞了所有执行,那么使用 future 辅助并发性(即将来不会调用阻塞调用仍然等于创建一个参与者并执行阻塞调用)?
  4. 处理每一步都依赖于最后一步的多阶段过程(即从数据库读取;调用阻塞 Web 服务;从数据库读取;写入数据库)的最佳方法是什么?

基本上下文是这样的:

  • 我正在使用将维护数千个会话的 Websocket 服务器。
  • 每个会话都有一些状态(即身份验证详细信息等);
  • Javascript 客户端将向服务器发送 JSON-RPC 消息,服务器会将其传递给适当的会话参与者,后者将执行它并返回结果。
  • RPC 调用的执行将涉及一些 I/O 和阻塞调用。
  • 将有大量并发请求(每个用户将通过 WebSocket 连接发出大量请求,并且会有很多用户)。

有没有更好的方法来实现这一目标?

4

3 回答 3

28

在 Akka 中,阻塞操作不会引发异常。您可以阻止来自 Actor 的呼叫(您可能希望将其最小化,但那是另一回事)。

  1. 不,1 个演员实例不能。
  2. 它不会阻止任何其他参与者。您可以通过使用特定的 Dispatcher 来影响这一点。Futures 使用默认调度器(通常是全局事件驱动的调度器),因此它在池中的线​​程上运行。您可以选择要为您的演员(每个演员或全部)使用的调度程序。我想如果你真的想创建一个问题,你可以将完全相同的(基于线程的)调度程序传递给期货和演员,但这需要你的一些意图。我想如果你有大量的期货无限期地阻塞并且 executorservice 已经被配置为固定数量的线程,你可能会炸毁 executorservice。所以有很多“如果”。仅当 Future 尚未完成时 f.get 才会阻塞。它将阻塞您调用它的 Actor 的“当前线程”(如果您从 Actor 调用它,
  3. 你不一定要阻止。您可以使用回调而不是 f.get。你甚至可以在没有阻塞的情况下编写 Futures。查看 Viktor 关于“akka 充满希望的未来”的演讲,了解更多详情:http ://skillsmatter.com/podcast/scala/talk-by-viktor-klang
  4. 我会在步骤之间使用异步通信(如果这些步骤本身就是有意义的过程),所以每个步骤都使用一个参与者,每个参与者都向下一个参与者发送单向消息,也可能向其他不会发送单向消息的参与者发送消息可以监督进程的块。通过这种方式,您可以创建一系列参与者,您可以在其中创建许多参与者,您可以在其前面放置一个负载平衡参与者,这样如果一个参与者在一个链中阻塞,另一个相同类型的参与者可能不会在另一个链中。这也适用于您的“上下文”问题,将工作负载传递给本地参与者,将它们链接在负载平衡参与者后面。

至于 netty(我假设您的意思是远程 Actor,因为这是 netty 在 Akka 中唯一使用的东西),如果您担心,请尽快将您的工作传递给本地 Actor 或未来(带有回调)关于时间或阻止netty以某种方式完成它的工作。

于 2011-06-30T06:30:04.700 回答
10

阻塞操作通常不会抛出异常,但等待未来(例如通过 using!!!!!send 方法)可能会抛出超时异常。这就是为什么您应该尽可能坚持使用即发即弃,使用有意义的超时值并在可能的情况下更喜欢回调的原因。

  1. akka actor 不能显式地连续处理多条消息,但您可以throughput通过配置文件使用该值。然后,如果它的消息队列不为空,actor 将处理几条消息(即它的接收方法将被依次调用多次):http: //akka.io/docs/akka/1.1.3/scala/dispatchers.html#id5

  2. 一个actor内部的阻塞操作不会“阻塞”所有的actor,但是如果你在actor之间共享线程(推荐使用),dispatcher的一个线程将被阻塞直到操作恢复。因此,请尝试尽可能多地组合期货,并注意超时值)。

3 和 4。我同意 Raymond 的回答。

于 2011-06-30T09:15:40.430 回答
1

Raymond 和范式所说的,而且,如果你想避免让线程池挨饿,你应该将任何阻塞操作包装在scala.concurrent.blocking.

当然最好避免阻塞操作,但有时您需要使用阻塞的库。如果您将所述代码包装在 中blocking,它将让执行上下文知道您可能正在阻塞该线程,以便它可以在需要时分配另一个线程。

这个问题比典型描述的更糟糕,因为如果你有几个阻塞操作,你最终可能会阻塞线程池中的所有线程并且没有空闲线程。如果您的所有线程都被阻塞在另一个演员/未来被安排之前不会发生的事情上,那么您最终可能会陷入死锁。

这是一个例子:

导入 scala.concurrent.blocking
...

未来 {
  val image = 阻塞 { load_image_from_potentially_slow_media() }
  val 增强 = image.enhance()
  阻止{
    如果(oracle.queryBetter(图像,增强)){
      write_new_image(增强)
    }
  }
  增强型
}

文档在这里

于 2014-01-27T19:55:00.563 回答