15

我有一个演员,它创建一个子演员来执行一些冗长的计算。

问题是子actor的初始化需要几秒钟,并且父actor发送给子actor的所有消息都被创建并被完全初始化。

这是我正在使用的代码的逻辑:

class ChildActor extends Actor {
  val tagger = IntializeTagger(...) // this takes a few seconds to complete

  def receive = {
    case Tag(text) => sender ! tagger.tag(text)
    case "hello" => println("Hello")
    case _ => println("Unknown message")
  }
}

class ParentActor extends Actor {
  val child = context.ActorOf(Props[ChildActor], name = "childactor")

  // the below two messages seem to get lost
  child ! "hello"
  child ! Tag("This is my sample text")

  def receive = {
     ...
  }
}

我怎么能解决这个问题?是否可以让父演员等到孩子完全初始化?我将使用带有路由的子actor,并且可能在远程actor系统上使用。

编辑

按照 drexin 的建议,我将代码更改为:

class ChildActor extends Actor {
  var tagger: Tagger = _

  override def preStart() = {
    tagger = IntializeTagger(...) // this takes a few seconds to complete
  }

  def receive = {
    case Tag(text) => sender ! tagger.tag(text)
    case "hello" => println("Hello")
    case _ => println("Unknown message")
  }
}

class ParentActor extends Actor {
  var child: ActorRef = _

  override def preStart() = {
    child = context.ActorOf(Props[ChildActor], name = "childactor")

    // When I add
    // Thread.sleep(5000)
    // here messages are processed without problems

    // wihout hardcoding the 5 seconds waiting 
    // the below two messages seem to get lost
    child ! "hello"
    child ! Tag("This is my sample text")
  }

  def receive = {
     ...
  }
}

但问题仍然存在。我错过了什么?

4

3 回答 3

18

不要tagger在构造函数中初始化,而是在preStart钩子中初始化,这样消息将被收集到消息框中并在参与者准备好时传递。

编辑

你应该在你的ParentActor类中创建actor,因为你会遇到同样的问题,如果在初始化ChildActor之前会响应。ParentActor

编辑2

我创建了一个简单的示例,但无法重现您的问题。以下代码工作得很好:

import akka.actor._

case class Tag(x: String)

class ChildActor extends Actor {
  type Tagger = String => String
  var tagger: Tagger = _

  override def preStart() = {
    tagger = (x: String) => x+"@tagged" // this takes a few seconds to complete
    Thread.sleep(2000) // simulate time taken to initialize Tagger
  }

  def receive = {
    case Tag(text) => sender ! tagger(text)
    case "hello" => println("Hello")
    case _ => println("Unknown message")
  }
}

class ParentActor extends Actor {
  var child: ActorRef = _

  override def preStart() = {
    child = context.actorOf(Props[ChildActor], name = "childactor")

    // When I add
    // Thread.sleep(5000)
    // here messages are processed without problems

    // wihout hardcoding the 5 seconds waiting 
    // the below two messages seem to get lost
    child ! "hello"
    child ! Tag("This is my sample text")
  }

  def receive = {
    case x => println(x)
  }
}

object Main extends App {

  val system = ActorSystem("MySystem")

  system.actorOf(Props[ParentActor])
}

输出是:

[info] Running Main
Hello
This is my sample text@tagged
于 2013-06-12T09:39:57.537 回答
9

我认为您可能正在寻找的是Stashand的组合become。这个想法是子actor将它的初始状态设置为未初始化,并且在这种状态下,它将隐藏所有传入的消息,直到它完全初始化。完全初始化后,您可以在将行为切换到初始化状态之前取消存储所有消息。一个简单的例子如下:

class ChildActor2 extends Actor with Stash{
  import context._
  var dep:SlowDependency = _

  override def preStart = {
    val me = context.self
    Future{
      dep = new SlowDependency
      me ! "done"
    }
  }

  def uninitialized:Receive = {
    case "done" => 
      unstashAll
      become(initialized) 
    case other => stash()
  }

  def initialized:Receive = {
    case "a" => println("received the 'a' message")
    case "b" => println("received the 'b' message")   
  }

  def receive = uninitialized
}

请注意preStart,我正在异步进行初始化,以免停止actor的启动。现在这有点难看,关闭了可变变量dep。您当然可以通过向另一个处理慢依赖的实例化并将其发送回该actor的消息发送消息来处理它。收到依赖后,它将调用become状态initialized

现在有一个警告,Stash我将直接从 Akka 文档中粘贴它:

Please note that the Stash can only be used together with actors that 
have a deque-based mailbox. For this, configure the mailbox-type of the 
dispatcher to be a deque-based mailbox, such as 
akka.dispatch.UnboundedDequeBasedMailbox (see Dispatchers (Scala)). 

现在,如果这不适合您,您可以尝试更多DI类型的方法,并通过它的构造函数将慢速依赖注入到子 Actor 中。所以你可以像这样定义子actor:

class ChildActor(dep:SlowDependency) extends Actor{
    ...
} 

然后在启动这个actor时,你会这样做:

context.actorOf(new Props().withCreator(new ChildActor(slowDep)), name = "child-actor")
于 2013-06-12T12:05:29.187 回答
0

我建议从子actor向父actor发送一条“就绪”消息,并在收到此消息后才开始向子actor发送消息。您可以仅在receive()简单用例的方法中执行此操作,也可以在子级初始化后使用becomeFSM更改父级actor行为(例如,将子级的消息存储在某个中间存储中,并在准备就绪时全部发送)。

于 2013-06-12T11:31:19.813 回答