4

我的目标是为我的小部件构建一个高度并发的后端。我目前将后端公开为 Web 服务,它接收运行特定小部件的请求(使用 Scalatra),从 DB 中获取小部件的代码并在演员(使用 Akka)中运行它,然后回复结果。所以想象我正在做类似的事情:

get("/run:id") {
  ...
  val actor = Actor.actorOf("...").start
  val result = actor !! (("Run",id), 10000)
  ...
} 

现在我相信这不是最好的并发解决方案,我应该以某种方式将监听请求和运行小部件结合到一个参与者实现中。你会如何设计它以获得最大的并发性?谢谢。

4

1 回答 1

5

您可以在 akka 引导文件或您自己的 ServletContextListener 中启动您的 actor,这样它们就可以在不绑定到 servlet 的情况下启动。然后您可以使用 akka 注册表查找它们。

Actor.registry.actorFor[MyActor] foreach { _ !! (("Run",id), 10000) }

除此之外,目前还没有 akka 与 scalatra 的真正集成。所以到目前为止,你能做的最好的事情就是对一群演员使用阻塞请求。

我不确定,但我不需要为每个请求生成一个参与者,而是拥有一个可以发送这些请求的小部件参与者池。如果您使用主管层次结构,那么如果池太大或太小,您可以使用主管来调整池的大小。

class MyContextListener extends ServletContextListener {

  def contextInitialized(sce: ServletContextEvent) {
    val factory = SupervisorFactory(
      SupervisorConfig(
      OneForOneStrategy(List(classOf[Exception]), 3, 1000),
      Supervise(actorOf[WidgetPoolSupervisor], Permanent)
  }

  def contextDestroyed(sce: ServletContextEvent) {
    Actor.registry.shutdownAll()
  }
}

class WidgetPoolSupervisor extends Actor {

  self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 3, 1000)

  override def preStart() {
    (1 to 5) foreach { _ =>
       self.spawnLink[MyWidgetProcessor]
    }
    Scheduler.schedule(self, 'checkPoolSize, 5, 5, TimeUnit.MINUTES)
  }

  protected def receive = {
    case 'checkPoolSize => {
      //implement logic that checks how quick the actors respond and if 
      //it takes to long add some actors to the pool.
      //as a bonus you can keep downsizing the actor pool until it reaches 1
      //or until the message starts returning too late.
    }
  }
}

class ScalatraApp extends ScalatraServlet {

  get("/run/:id") {
    // the !! construct should not appear anywhere else in your code except
    // in the scalatra action. You don't want to block anywhere else, but in a 
    // scalatra action it's ok as the web request itself is synchronous too and needs to 
    // to wait for the full response to have come back anyway.
    Actor.registry.actorFor[MyWidgetProcessor] foreach { 
      _ !! ((Run, id), 10000) 
    } getOrElse {
      throw new HeyIExpectedAResultException()
    } 
  }
}

请务必将上面的代码视为恰好看起来像 scala 的伪代码,我只是想说明这个概念。

于 2011-07-10T16:39:15.313 回答