30

作为一个新手,我试图了解演员是如何工作的。而且,从文档中,我想我明白演员是在同步模式下执行的对象,而且演员执行可以包含阻塞/同步方法调用,例如数据库请求

但是,我不明白的是,如果你编写一个内部有一些阻塞调用的actor(比如阻塞查询执行),它会弄乱整个线程池(从某种意义上说,cpu 利用率会下降,等等。 ), 正确的 ?我的意思是,根据我的理解,如果/当参与者进行阻塞调用时,JVM 无法理解它是否可以将该线程切换到其他人。

那么,考虑到并发的性质,Actor 不应该做任何阻塞调用是不是很明显?

如果是这种情况,那么进行非阻塞/异步调用的推荐方法是什么,比如说一个 Web 服务调用,它会在请求完成时获取一些东西并向另一个参与者发送消息?我们是否应该简单地在演员中使用类似的东西:

未来地图 { 响应 => x !响应体}

这是处理这个问题的正确方法吗?

如果您能为我澄清这一点,将不胜感激。

4

3 回答 3

17

这实际上取决于用例。如果查询不需要序列化,那么您可以在以后执行查询并将结果发送回发送方,如下所示:

import scala.concurrent.{ future, blocking}
import akka.pattern.pipe

val resFut = future {
  blocking {
    executeQuery()
  }
}

resFut pipeTo sender

您还可以专门为 DB 调用创建一个专用的调度程序,并使用一个路由器来创建参与者。通过这种方式,您还可以轻松限制并发 DB 请求的数量。

于 2013-11-13T07:44:24.143 回答
16

非常棒的介绍“Scala 新手指南第 14 部分:Actor 并发方法” http://danielwestheide.com/blog/2013/02/27/the-neophytes-guide-to-scala-part-14-the- actor-approach-to-concurrency.html

Actor 接收消息,将阻塞代码包装到未来,在它的 Future.onSuccess 方法中 - 使用其他异步消息发送结果。但请注意,发件人变量可能会更改,因此请关闭它(在未来对象中进行本地引用)。

ps: The Neophyte's Guide to Scala - 非常棒的书。

更新:(添加示例代码)

我们有工人和经理。经理设置要完成的工作,工人报告“得到它”并开始长流程( sleep 1000 )。同时,系统用“活着”的消息 ping 经理,经理用它们 ping 工人。工作完成后 - 工人通知经理。

注意:在导入的“默认/全局”线程池执行器中执行 sleep 1000 - 你可以得到线程饥饿。注意: val command = sender 需要“关闭”对原始发件人的引用,因为当 onSuccess 将被执行时 - 演员中的当前发件人可能已经设置为其他一些“发件人”......

日志:

01:35:12:632 Humming ...
01:35:12:633 manager: flush sent
01:35:12:633 worker: got command
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:660 worker: started
01:35:12:662 worker: alive
01:35:12:662 manager: resource allocated
01:35:12:662 worker: alive
01:35:12:662 worker: alive
01:35:13:661 worker: done
01:35:13:663 manager: work is done
01:35:17:633 Shutdown!

代码:

import akka.actor.{Props, ActorSystem, ActorRef, Actor}
import com.typesafe.config.ConfigFactory
import java.text.SimpleDateFormat
import java.util.Date
import scala.concurrent._
import ExecutionContext.Implicits.global

object Sample {

  private val fmt = new SimpleDateFormat("HH:mm:ss:SSS")

  def printWithTime(msg: String) = {
    println(fmt.format(new Date()) + " " + msg)
  }

  class WorkerActor extends Actor {
    protected def receive = {
      case "now" =>
        val commander = sender
        printWithTime("worker: got command")
        future {
          printWithTime("worker: started")
          Thread.sleep(1000)
          printWithTime("worker: done")
        }(ExecutionContext.Implicits.global) onSuccess {
          // here commander = original sender who requested the start of the future
          case _ => commander ! "done" 
        }
        commander ! "working"
      case "alive?" =>
        printWithTime("worker: alive")
    }
  }

  class ManagerActor(worker: ActorRef) extends Actor {
    protected def receive = {
      case "do" =>
        worker ! "now"
        printWithTime("manager: flush sent")
      case "working" =>
        printWithTime("manager: resource allocated")
      case "done" =>
        printWithTime("manager: work is done")
      case "alive?" =>
        printWithTime("manager alive")
        worker ! "alive?"
    }
  }

  def main(args: Array[String]) {

    val config = ConfigFactory.parseString("" +
      "akka.loglevel=DEBUG\n" +
      "akka.debug.lifecycle=on\n" +
      "akka.debug.receive=on\n" +
      "akka.debug.event-stream=on\n" +
      "akka.debug.unhandled=on\n" +
      ""
    )

    val system = ActorSystem("mine", config)
    val actor1 = system.actorOf(Props[WorkerActor], "worker")
    val actor2 = system.actorOf(Props(new ManagerActor(actor1)), "manager")

    actor2 ! "do"
    actor2 ! "alive?"
    actor2 ! "alive?"
    actor2 ! "alive?"

    printWithTime("Humming ...")
    Thread.sleep(5000)
    printWithTime("Shutdown!")
    system.shutdown()

  }
}
于 2013-11-13T04:24:22.450 回答
1

如果您正在考虑在 Akka 中进行阻塞调用,那么您考虑线程池是正确的。你做的阻塞越多,你需要的线程池就越大。一个完全非阻塞的系统只需要一个与你机器的 CPU 内核数量相等的线程池。参考配置使用机器上 CPU 核心数 3 倍的池来允许一些阻塞:

    # The core pool size factor is used to determine thread pool core size
    # using the following formula: ceil(available processors * factor).
    # Resulting size is then bounded by the core-pool-size-min and
    # core-pool-size-max values.
    core-pool-size-factor = 3.0

资源

但是,如果您进行更多阻塞,您可能希望增加akka.default-dispatcher.fork-join-executor.core-pool-size-factor一个更高的数字,或者创建一个不同于默认值的调度程序,专门用于阻塞具有更高级别的调用fork-join-executor.core-pool-size-factor

WRT 在 Akka 中阻止调用的最佳方法是什么。我建议通过创建多个执行阻塞调用的参与者实例并在它们前面放置一个路由器来使它们看起来像应用程序的其余部分的单个参与者来扩展。

于 2013-11-13T08:28:38.440 回答