1

我有一个初始化成本和内存占用很大的对象。初始化时间是人类可察觉的,但创建频率很低。

class HeavyClass {
  heavyInit()
}

我的解决方案是创建一个 Provider actor,它会提前创建一个对象,并根据请求立即提供它。然后提供者将继续创建下一个对象。

class HeavyClassProvider extends Actor {

  var hc: Option[HeavyClass] = Some(new HeavyClass())

  override def receive = {
    case "REQUEST" =>
      sender ! { hc getOrElse new HeavyClass() }
      self ! "RESPAWN"
      hc = None

    case "RESPAWN" if (hc == None) => hc = Some(new HeavyClass())
  }

}

还有一个消费者:

abstract class HeavyClassConsumer extends Actor {

  import context.dispatcher

  import akka.pattern.ask
  import scala.concurrent.duration._
  import akka.util.Timeout

  implicit val timeout = Timeout(5, SECONDS)

  var provider: ActorRef
  var hc: Option[HeavyClass] = None

  override def receive = {
    case "START" =>
      ((provider ask "REQUEST").mapTo[HeavyClass]
       onSuccess { case h: HeavyClass => hc = Some(h) })
  }

}

这是一种常见的模式吗?代码感觉很古怪,有没有明显更干净的方法?

4

2 回答 2

1

我怀疑它更经常使用某种同步工厂来完成,但参与者似乎与任何同步机制一样好,特别是如果调用代码已经建立在异步模式上。

当前实现的一个潜在问题是它无法并行创建HeavyClass“一次”请求的多个对象。这可能是一个特性,并且并行创建多个特性会使系统陷入困境。另一方面,如果它“只是慢”,您可能希望将“按需”实例的创建分拆到它自己的线程/参与者中。

于 2014-07-27T02:39:21.807 回答
1

您的解决方案的问题是,当您调用new HeavyClass()您的参与者时,您的参与者将阻塞,直到它处理该计算。在 Future 或另一个 Actor 中执行此操作可以避免这种情况。这是一种方法:

import akka.pattern.pipe
...

class HeavyClassProvider extends Actor {

  // start off async computation during init:
  var hc: Future[HeavyClass] = Future(new HeavyClass)

  override def receive = {
    case "REQUEST" =>
      // send result to requester when it's complete or
      // immediately if its already complete:
      hc pipeTo sender
      // start a new computation and send to self:
      Future(new HeavyClass) pipeTo self
    case result: HeavyClass => // new result is ready
      hc = Future.successful(result) // update with newly computed result
    case Status.Failure(f) => // computation failed
      hc = Future.failed[HeavyClass](f)
      // maybe request a recomputation again
  }
}

(我没有编译)

我的第一个解决方案的一个特点是它不限制同时计算多少个期货。如果您收到多个请求,它将计算多个可能不合需要的未来,尽管此 Actor 中没有竞争条件。为了限制这一点,只需在 Actor 中引入一个布尔标志,告诉您是否已经在计算某些东西。此外,所有这些vars 都可以替换为become/unbecome行为。

给定多个请求的单个并发 Future 计算示例:

import akka.pattern.pipe
...

class HeavyClassProvider extends Actor {

  // start off async computation during init:
  var hc: Future[HeavyClass] = Future(new HeavyClass) pipeTo self
  var computing: Boolean = true

  override def receive = {
    case "REQUEST" =>
      // send result to requester when it's complete or
      // immediately if its already complete:
      hc pipeTo sender
      // start a new computation and send to self:
      if(! computing)
        Future(new HeavyClass) pipeTo self
    case result: HeavyClass => // new result is ready
      hc = Future.successful(result) // update with newly computed result
      computing = false
    case Status.Failure(f) => // computation failed
      hc = Future.failed[HeavyClass](f)
      computing = false
      // maybe request a recomputation again
  }
}

编辑:在评论中进一步讨论要求之后,这里是另一个实现,它以非阻塞方式在每个请求上向发送者/客户端发送一个新对象:

import akka.pattern.pipe
...

class HeavyClassProvider extends Actor {
  override def receive = {
    case "REQUEST" =>
      Future(new HeavyClass) pipeTo sender
  }
}

然后可以简化为:

object SomeFactoryObject {
  def computeLongOp: Future[HeavyClass] = Future(new HeavyClass)
}

在这种情况下,不需要演员。在这些情况下,使用 Actor 作为同步机制和非阻塞计算的目的是让该 Actor 缓存结果并提供具有更复杂逻辑的异步计算Future,否则Future就足够了。

于 2014-07-27T04:09:03.827 回答