0

我对此完全陌生,但这里有一个 Scala 演员,它是用我的班级创建的,只为其他演员生成消息。我希望它每秒都醒来,收集一些指标,将它们发送给其他参与者,然后重新入睡。终止应用程序时,我需要向该参与者发送消息以退出

class Node() {
  println("A new Node is being constructed")
  val load_monitor: Actor = actor {
    val s: Sigar = new Sigar
    while (true) {
      Thread.sleep(1000);

      // Will be replaced with something like 
      // load_manager | s.getCpuSystem
      println(s.getCpuPerc)

      self.receiveWithin(100) { 
        case "EXIT" => exit() 
        case TIMEOUT => {}
      } 
    }
  }

  // Other Node code below...
}

这似乎有效,但我不相信它是正确的。调用 Thread.sleep 聪明吗?如果这是一个完整的线程,我现在有一个睡眠线程。如果这是一个事件,我现在是否阻止了一些事件处理队列?我可以使用接收/反应的东西更正确地做到这一点吗?我无法理解接收/反应是否有效地停止了演员的执行,这意味着我无法每秒醒来检查指标。下面的代码我也考虑过,不知道是不是比上面的代码好!

class Node() {
  println("A new Node is being constructed")
  val load_monitor: Actor = actor {
    val s: Sigar = new Sigar
    while (true) {
      self.receiveWithin(1000) { 
        case "EXIT" => exit() 
        // Will be replaced with something like 
        // load_manager | s.getCpuSystem
        case TIMEOUT => println(s.getCpuPerc)
      } 
    }
  }

  // Other Node code below...
}
4

3 回答 3

3

首先,我强烈推荐使用Akka,从 Scala 2.10 开始,它已经取代了 Scala Actors。Akka 是 Actor 范式的独立实现,并非基于 Scala Actors。

话虽这么说,我将根据 Akka 2.x 来回答,既因为我最熟悉 Akka 2.x,也因为它可能对你更有用,因为 Akka 现在是事实上的 Actor Scala(也许是整个 JVM)的实现。

Actor 范式很容易理解——它基于两个主要思想:封装的可变性和消息传递。每个 Actor 都封装了它自己的状态,除了 Actor 本身之外,其他任何东西都不应触及 Actor 的内部状态。参与者通过消息进行通信。如果某个 Actor A 想要改变封装在 Actor B 内的变量(例如,也许 A 代表一个客户,B 代表客户的银行账户),A 将向 B 发送一条消息,表明这一点 - 请注意,B 可以随意忽略该消息。

Actor 不会一直占用线程,他们只会在邮件在他们的邮箱中等待他们时“醒来”。因此,调用 toThread.sleep不仅没有必要,而且不推荐。

在Akka 中,消息是松散类型Any的(receive

您可以为每个 Actor 创建一个调度程序,每 x 秒向自己发送一条消息(Akka 有工具可以让这变得非常简单)。此消息将触发 Actor 唤醒,读取邮箱中的消息(可能是指示它应该收集统计信息的消息),并对其采取行动。

要将消息广播给其他几个 Actor,有很多方法可以做到这一点。一种简单的方法是在度量收集 Actor 中保留某种ActorRef(Akka 特定的)集合,并且每次 Actor 醒来收集统计数据时,它只是向Actor与该集合对应的每个对象发送一条消息。还有很多其他方法可以做到这一点。

如果您真的对在 Scala(或一般的 JVM)中使用 Actors 感兴趣,我强烈推荐 Derek Wyatt 的书Akka Concurrency

于 2013-02-10T08:44:19.313 回答
2

在演员中使用while(true)andThread.sleep不是一个好主意。你不想阻止。我会定期向演员发送一条消息,然后它会对此做出反应。

例子:

import scala.actors.Actor

import java.util.Timer
import java.util.TimerTask

case object DoSomething

class MyActor extends Actor {
  def act() {
    loop {
      react {
        case DoSomething =>
          // do measurement
          println(Runtime.getRuntime.freeMemory + " bytes free")

        case 'kill =>
          exit()
      }
    }
  }
}

val actor = new MyActor
actor.start

val timer = new Timer(true)

timer.schedule(new TimerTask {
  def run {
    actor ! DoSomething
  }
}, 1000, 1000)

我还建议您改用akka。使用 akka,同样的事情看起来像这样(未经测试):

import akka.actor._
import scala.concurrent.duration._

case object DoSomething

class MyActor extends Actor {

  context.scheduler.schedule(1 second, 1 second) {
    self ! DoSomething
  }

  def receive = {
    case DoSomething =>
      // do measurement
      println(Runtime.getRuntime.freeMemory + " bytes free")
  }
}

val system = new ActorSystem("MySystem")

system.actorOf(Props[MyActor])

要杀死一个 akka 演员,您可以随时向它发送akka.actor.PoisonPill,这将在处理完队列中的所有剩余消息后杀死该演员。

于 2013-02-10T08:27:33.253 回答
1

我对此完全陌生

在这种情况下,正如 adlbertc 所说,你真的应该从 Akka 演员开始,除非你坚持使用 Scala 演员来维护遗留应用程序的要求(因为 Scala 演员在当前的 Scala 版本中已被弃用,并将在下一个版本中删除) .

调用 Thread.sleep 聪明吗?如果这是一个完整的线程,我现在有一个睡眠线程。

不,如果你打电话,Thread.sleep总是有一个睡眠线程。Actor 不会以某种方式改变现有方法的含义!

您的第二个选项更好,但receiveWithin意味着该演员将在自己的线程中运行,更好地使用reactWithin并查看此答案

使用 Akka 演员,您可以只使用Scheduler

于 2013-02-10T08:27:23.850 回答