19

我是 Akka 的初学者。我需要在每天的固定时间安排一项任务,比如早上 8 点。例如,我知道如何定期安排任务

import akka.util.duration._

scheduler.schedule(0 seconds, 10 minutes) {
  doSomething()
}

在 Akka 中在一天中的固定时间安排任务的最简单方法是什么?

一个小括号

使用此功能很容易做我想做的事。玩具实现看起来像

scheduler.schedule(0 seconds, 24 hours) {
  val now = computeTimeOfDay()
  val delay = desiredTime - now

  scheduler.scheduleOnce(delay) {
    doSomething()
  }
}

这并不难,但我介绍了一点比赛条件。事实上,考虑一下如果我在早上 8 点之前启动它会发生什么。外部关闭将开始,但到我计算时,delay我们可能会在上午 8 点之后。这意味着应该立即执行的内部关闭将被推迟到明天,从而跳过一天的执行。

有一些方法可以解决这种竞争条件:例如,我可以每 12 小时执行一次检查,而不是立即安排任务,而是将其发送给一次不会接受多个任务的参与者。

但很可能,这已经存在于 Akka 或某些扩展中。

4

4 回答 4

7

一次编写,每天运行

val GatherStatisticsPeriod = 24 hours

private[this] val scheduled = new AtomicBoolean(false)

def calcBeforeMidnight: Duration = { 
  // TODO implement 
} 

def preRestart(reason: Throwable, message: Option[Any]) {
  self ! GatherStatisticsScheduled(scheduled.get)
  super.preRestart(reason, message)
}

def schedule(period: Duration, who: ActorRef) = 
  ServerRoot.actorSystem.scheduler
    .scheduleOnce(period)(who ! GatherStatisticsTick)

def receive = {

  case StartServer(nodeName) => 
    sender ! ServerStarted(nodeName)
    if (scheduled.compareAndSet(false, true)) 
      schedule(calcBeforeMidnight, self)

  case GatherStatisticsTick =>
    stats.update
    scheduled.set(true)
    schedule(GatherStatisticsPeriod, self) 

  case GatherStatisticsScheduled(isScheduled) =>
    if (isScheduled && scheduled.compareAndSet(false, isScheduled))
      schedule(calcBeforeMidnight, self)

}

我相信 Akka 的调度程序以一种或另一种方式在内部处理重启。我使用非持久方式向自己发送消息 - 实际上没有严格的传递保证。此外,刻度可能会有所不同,因此 GatherStatisticsPeriod 可能是一个函数。

于 2012-12-05T01:03:04.700 回答
6

要在 Akka 中使用这种调度,您必须自己推出或者使用 Quartz,通过Akka Camelakka 的原型石英

如果您不需要任何花哨且极其准确的东西,那么我只需计算所需的第一次延迟并将其用作调度调用的开始延迟,并相信间隔。

于 2012-12-05T11:29:57.987 回答
6

假设您想在每天下午 13 点运行您的任务。

import scala.concurrent.duration._
import java.time.LocalTime

val interval = 24.hours
val delay = {
  val time = LocalTime.of(13, 0).toSecondOfDay
  val now = LocalTime.now().toSecondOfDay
  val fullDay = 60 * 60 * 24
  val difference = time - now
  if (difference < 0) {
    fullDay + difference
  } else {
    time - now
  }
}.seconds

system.scheduler.schedule(delay, interval)(doSomething())

另请记住,服务器时区可能与您的不同。

于 2018-02-16T10:40:30.407 回答
1

只是为了添加另一种方法来实现它,这可以使用Akka Streams通过勾选消息并按时过滤来完成。

Source
  .tick(0.seconds, 2.seconds, "hello") // emits "hello" every two seconds
  .filter(_ => {
    val now = LocalDateTime.now.getSecond
    now > 20 && now < 30 // will let through only if the timing is right.
  })
  .runForeach(n => println("final sink received " + n))
于 2018-07-15T08:01:30.533 回答