16

让演员睡觉的最佳方式是什么?我将参与者设置为想要维护数据库的不同部分(包括从外部来源获取数据)的代理。出于多种原因(包括不使数据库或通信超载和一般负载问题),我希望参与者在每个操作之间休眠。我正在看类似 10 个演员对象的东西。

演员将几乎无限运行,因为总会有新数据进入,或者坐在表中等待传播到数据库的其他部分等。这个想法是让数据库在任何时候都尽可能完整及时。

我可以用一个无限循环来做到这一点,并在每个循环结束时休眠,但根据http://www.scala-lang.org/node/242演员使用一个线程池,只要所有线程都被阻塞,它就会扩展. 因此,我认为每个参与者中的 Thread.sleep 都是一个坏主意,因为这会不必要地浪费线程。

我也许可以有一个有自己的循环的中心参与者,它在时钟上向订阅者发送消息(如异步事件时钟观察者)?

有没有人做过类似的事情或有什么建议?很抱歉提供额外(也许是多余的)信息。

干杯

4

4 回答 4

21

在第一个答案中,Erlang 有一个好点,但它似乎消失了。你可以很容易地用 Scala 演员做同样的类似 Erlang 的把戏。例如,让我们创建一个不使用线程的调度程序:

import actors.{Actor,TIMEOUT}

def scheduler(time: Long)(f: => Unit) = {
  def fixedRateLoop {
    Actor.reactWithin(time) {
      case TIMEOUT => f; fixedRateLoop
      case 'stop => 
    }
  }
  Actor.actor(fixedRateLoop)
}

让我们使用一个测试客户端参与者来测试它(我在 Scala REPL 中做到了):

case class Ping(t: Long)

import Actor._
val test = actor { loop {
  receiveWithin(3000) {
    case Ping(t) => println(t/1000)
    case TIMEOUT => println("TIMEOUT")
    case 'stop => exit
  }
} }

运行调度程序:

import compat.Platform.currentTime
val sched = scheduler(2000) { test ! Ping(currentTime) }

你会看到这样的东西

scala> 1249383399
1249383401
1249383403
1249383405
1249383407

这意味着我们的调度程序按预期每 2 秒发送一条消息。让我们停止调度程序:

sched ! 'stop

测试客户端将开始报告超时:

scala> TIMEOUT
TIMEOUT
TIMEOUT

也停止它:

test ! 'stop
于 2009-08-04T11:00:10.817 回答
17

没有必要显式地使参与者休眠:对每个参与者使用loopandreact意味着底层线程池将有等待线程,而参与者没有消息要处理。

如果您想为您的演员安排事件来处理,这很容易使用实用程序中的单线程调度java.util.concurrent程序:

object Scheduler {
  import java.util.concurrent.Executors
  import scala.compat.Platform
  import java.util.concurrent.TimeUnit
  private lazy val sched = Executors.newSingleThreadScheduledExecutor();
  def schedule(f: => Unit, time: Long) {
    sched.schedule(new Runnable {
      def run = f
    }, time , TimeUnit.MILLISECONDS);
  }
}

您可以扩展它以执行定期任务,并且可以这样使用它:

val execTime = //...  
Scheduler.schedule( { Actor.actor { target ! message }; () }, execTime)

然后,您的目标参与者将只需要实现一个适当的react循环来处理给定的消息。你不需要让任何演员睡觉。

于 2009-08-03T22:43:27.300 回答
4

来自 lift-util 的 ActorPing(Apache 许可证)有 schedule 和 scheduleAtFixedRate 来源:ActorPing.scala

从斯卡拉多克:

ActorPing 对象安排一个演员在特定的时间间隔用给定的消息进行 ping。schedule 方法返回一个 ScheduledFuture 对象,如果需要,可以取消该对象

于 2009-08-07T10:04:19.787 回答
2

不幸的是,oxbow_lakes 的答案有两个错误。

一个是简单的声明错误(long time vs time: Long),但第二个更微妙。

oxbow_lakes 声明运行为

def run = actors.Scheduler.execute(f) 

然而,这会导致消息不时消失。那就是:他们被安排但永远不会发送。声明运行为

def run = f

为我修好了。它在lift-util 的ActorPing 中以完全相同的方式完成。

整个调度程序代码变为:

object Scheduler {
    private lazy val sched = Executors.newSingleThreadedScheduledExecutor();
    def schedule(f: => Unit, time: Long) {
        sched.schedule(new Runnable {
          def run = f
        }, time - Platform.currentTime, TimeUnit.MILLISECONDS);
    }
}

我试图编辑 oxbow_lakes 帖子,但无法保存(坏了?),我还没有评论的权利。因此,一个新的职位。

于 2011-11-11T21:16:43.687 回答