1

我有以下代码遍历人员列表并在 class1 中为每个人调用回调。

def syncPeople(callback: Person => _) = Future {
   person.findAll(criteria).foldLeft(0L) { (count, obj) =>
      callback(obj)
      count + 1
   } 
}

回调和对 syncPeople 的调用在 class2 中,看起来与此类似

def getActor(person: Person):ActorRef = {
  if(person.isMale) maleActor
  else femaleActor
}

def process(person: Person): Unit = {
   val workActor = getActor(person)
   workActor ! person
} //The actor does the actual work and may be quite intense

def syncPeople(process)

现在,我想跟踪同步所有人所花费的总时间。即当最后一个 workActor 完成工作时。我正在使用第三个 Actor:MonitorActor 来跟踪开始和结束时间。MaleActor、FemaleActor 可以在处理个人时向此发送消息

跟踪这个产生的进程的最佳方法是什么?

我探索了

  1. Future.sequence // 但是向 workActor 发送消息的类不是演员。所以未来不会收到消息

  2. 在完成时跟踪 personIds,但不使用 var,在 MonitorActor 中累积接收到的消息,它不可能实现这一点.. 并且使用 var 不是首选的做事方式

实现这一点的其他方法可能是什么

4

2 回答 2

4

有趣的是,我目前正在研究与此非常相似的问题。我建议的解决方案是使用akka-fsm来跟踪状态。

基本上在你的状态对象之外,做一些事情,比如生成一个代表 id 的 Long:

def getId(): Long = System.currentTimeMillis() / 1000L

正确实现的状态对象是不可变的,因此您只需在整个事务中重复使用此 id。

我知道这个答案缺少很多实现细节,但我仍在自己的代码中进行实现。希望在阅读 akka-fsm 并使用它之后,这个答案会有意义吗?

于 2012-11-08T22:52:33.183 回答
1

不要妖魔化可变状态,它是 SHARED 可变状态,这会导致大部分问题。您没有在 Actor 内部共享可变状态,因为您总是与 actorRefs 对话,并且 Actor 一次只处理一条消息(没有竞争条件和其他邪恶的东西)。我的意思是,使用 a 是可以的var(除非你在演员内部产生一些期货,这会改变var,因为那样你就会回到 SHARING 可变状态)。正如@devnulled 建议的那样,FSM 是另一种解决方案,但对于您的用例来说,这听起来更像是一种矫枉过正。

于 2012-11-08T23:19:42.463 回答