27

考虑一个场景,我正在实现一个使用 Akka 处理传入任务的系统。我有一个主要参与者,它接收任务并将它们分派给一些处理任务的工作参与者。

我的第一直觉是通过让调度程序为每个传入任务创建一个演员来实现这一点。工作角色处理完任务后,它会停止。

这对我来说似乎是最干净的解决方案,因为它遵循“一个任务,一个演员”的原则。另一种解决方案是重用演员 - 但这涉及清理和一些池管理的额外复杂性。

我知道阿卡的演员很便宜。但我想知道是否存在与重复创建和删除演员相关的固有成本。Akka 用于记录参与者的数据结构是否存在任何隐藏成本?

负载应该是每秒数十或数百个任务的数量级 - 将其视为一个生产网络服务器,每个请求创建一个参与者。

当然,正确的答案在于根据输入负载的类型对系统进行分析和微调。但我想知道是否有人可以根据自己的经验告诉我一些事情?

后期编辑:

我应该提供有关手头任务的更多详细信息:

  • 只有 N 个活动任务可以在某个时间点运行。正如@drexin 指出的那样 - 这可以使用路由器轻松解决。但是,任务的执行不是简单的运行和完成类型的事情。
  • 任务可能需要来自其他参与者或服务的信息,因此可能必须等待并进入睡眠状态。通过这样做,他们释放了一个执行槽。这个槽可以被另一个等待的actor占据,它现在有机会运行。您可以类比进程在一个 CPU 上的调度方式。
  • 每个工人参与者都需要保持一些关于任务执行的状态。

注意:我很欣赏我的问题的替代解决方案,我一定会考虑它们。但是,我还想回答有关 Akka 中演员的密集创建和删除的主要问题。

4

4 回答 4

21

您不应该为每个请求创建一个参与者,而应该使用路由器将消息分发给动态数量的参与者。这就是路由器的用途。阅读这部分文档以获取更多信息:http ://doc.akka.io/docs/akka/2.0.4/scala/routing.html

编辑:

创建顶级actor system.actorOf(创建子演员(在演员内部context.actorOf)要便宜得多。

但我仍然建议你重新考虑这一点,因为根据创建和删除演员的频率,你也会对 GC 施加额外的压力。

编辑2:

最重要的是,演员不是线程!因此,即使您创建了 1M 个actor,它们也只能在池中的线​​程数上运行。因此,根据配置中的吞吐量设置,每个参与者将在线程再次释放到池之前处理 n 条消息。

请注意,阻塞线程(包括睡眠)不会将其返回到池中!

于 2012-12-10T20:59:02.540 回答
8

在创建后立即收到一条消息并在发送结果后立即死亡的演员可以被未来替换。期货比演员更轻量级。

完成后,您可以使用它pipeTo来接收未来的结果。例如在你的演员启动计算:

def receive = {
  case t: Task => future { executeTask( t ) }.pipeTo(self)
  case r: Result => processTheResult(r)
}

executeTask您的函数在哪里Task返回 a Result

但是,我将通过路由器重用池中的演员,如@drexin 回答中所述。

于 2012-12-10T21:09:13.867 回答
1

I've tested with 10000 remote actors created from some main context by a root actor, same scheme as in prod module a single actor was created. MBP 2.5GHz x2:

  • in main: main ? root // main asks root to create an actor
  • in main: actorOf(child) // create a child
  • in root: watch(child) // watch lifecycle messages
  • in root: root ? child // wait for response (connection check)
  • in child: child ! root // response (connection ok)
  • in root: root ! main // notify created

Code:

def start(userName: String) = {
  logger.error("HELLOOOOOOOO ")
  val n: Int = 10000
  var t0, t1: Long = 0
  t0 = System.nanoTime
  for (i <- 0 to n) {
    val msg = StartClient(userName + i)
    Await.result(rootActor ? msg, timeout.duration).asInstanceOf[ClientStarted] match {
    case succ @ ClientStarted(userName) => 
      // logger.info("[C][SUCC] Client started: " + succ)
    case _ => 
      logger.error("Terminated on waiting for response from " + i + "-th actor")
      throw new RuntimeException("[C][FAIL] Could not start client: " + msg)
    }
  }
  t1 = System.nanoTime
  logger.error("Starting of a single actor of " + n + ": " + ((t1 - t0) / 1000000.0 / n.toDouble) + " ms")
}

The result:

Starting of a single actor of 10000: 0.3642917 ms

There was a message stating that "Slf4jEventHandler started" between "HELOOOOOOOO" and "Starting of a single", so the experiment seems even more realistic (?)

Dispatchers was a default (a PinnedDispatcher starting a new thread each and every time), and it seemed like all that stuff is the same as Thread.start() was, for a long long time since Java 1 - 500K-1M cycles or so ^)

That's why I've changed all code inside loop, to a new java.lang.Thread().start()

The result:

Starting of a single actor of 10000: 0.1355219 ms
于 2012-12-11T00:14:18.043 回答
0

Actors 可以制作出色的有限状态机,因此让它们在这里帮助推动您的设计。如果您的请求处理状态通过每个请求有一个参与者而大大简化,那么就这样做。根据经验,我发现演员特别擅长管理两个以上的州。

但是,通常情况下,一个请求处理参与者从一个集合中引用请求状态,该集合作为其自身状态的一部分进行维护,这是一种常见的方法。请注意,这也可以通过 Akka 反应流和使用扫描阶段来实现。

于 2019-08-03T21:12:20.817 回答