7

我有演员需要做非常长时间运行和计算昂贵的工作,但计算本身可以增量完成。因此,虽然完整的计算本身需要数小时才能完成,但中间结果实际上非常有用,我希望能够响应它们的任何请求。这是我想做的伪代码:

var intermediateResult = ...
loop {
     while (mailbox.isEmpty && computationNotFinished)
       intermediateResult = computationStep(intermediateResult)


     receive {
         case GetCurrentResult => sender ! intermediateResult
         ...other messages...
     }
 }
4

4 回答 4

8

做到这一点的最佳方法非常接近您已经在做的事情:

case class Continue(todo: ToDo)
class Worker extends Actor {
  var state: IntermediateState = _
  def receive = {
    case Work(x) =>
      val (next, todo) = calc(state, x)
      state = next
      self ! Continue(todo)
    case Continue(todo) if todo.isEmpty => // done
    case Continue(todo) =>
      val (next, rest) = calc(state, todo)
      state = next
      self ! Continue(rest)
  }
  def calc(state: IntermediateState, todo: ToDo): (IntermediateState, ToDo)
}

编辑:更多背景

当一个 Actor 向自己发送消息时,Akka 的内部处理基本上会在一个while循环中运行这些消息;一次处理的消息数量由 Actor 的调度程序throughput设置(默认为 5)确定,在处理完此数量之后,线程将返回到池中,并且继续作为新任务排队到调度程序。因此,上述解决方案中有两个可调参数:

  • 处理单个消息的多个步骤(如果处理步骤非常小)
  • 增加throughput设置以增加吞吐量并降低公平性

最初的问题似乎有数百个这样的参与者在运行,可能是在没有数百个 CPU 的普通硬件上运行,因此应该设置吞吐量设置,使得每个批次花费的时间不超过 ca。10 毫秒。

绩效评估

让我们玩一下斐波那契:

Welcome to Scala version 2.10.0-RC1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_07).
Type in expressions to have them evaluated.
Type :help for more information.

scala> def fib(x1: BigInt, x2: BigInt, steps: Int): (BigInt, BigInt) = if(steps>0) fib(x2, x1+x2, steps-1) else (x1, x2)
fib: (x1: BigInt, x2: BigInt, steps: Int)(BigInt, BigInt)

scala> def time(code: =>Unit) { val start = System.currentTimeMillis; code; println("code took " + (System.currentTimeMillis - start) + "ms") }
time: (code: => Unit)Unit

scala> time(fib(1, 1, 1000))
code took 1ms

scala> time(fib(1, 1, 1000))
code took 1ms

scala> time(fib(1, 1, 10000))
code took 5ms

scala> time(fib(1, 1, 100000))
code took 455ms

scala> time(fib(1, 1, 1000000))
code took 17172ms

这意味着在一个可能相当优化的循环中,fib_100000 需要半秒。现在让我们和演员一起玩一下:

scala> case class Cont(steps: Int, batch: Int)
defined class Cont

scala> val me = inbox()
me: akka.actor.ActorDSL.Inbox = akka.actor.dsl.Inbox$Inbox@32c0fe13

scala> val a = actor(new Act {
  var s: (BigInt, BigInt) = _
  become {
    case Cont(x, y) if y < 0 => s = (1, 1); self ! Cont(x, -y)
    case Cont(x, y) if x > 0 => s = fib(s._1, s._2, y); self ! Cont(x - 1, y)
    case _: Cont => me.receiver ! s
   }
})
a: akka.actor.ActorRef = Actor[akka://repl/user/$c]

scala> time{a ! Cont(1000, -1); me.receive(10 seconds)}
code took 4ms

scala> time{a ! Cont(10000, -1); me.receive(10 seconds)}
code took 27ms

scala> time{a ! Cont(100000, -1); me.receive(10 seconds)}
code took 632ms

scala> time{a ! Cont(1000000, -1); me.receive(30 seconds)}
code took 17936ms

这已经很有趣了:如果每一步有足够长的时间(在最后一行的幕后有巨大的 BigInts),演员并没有太多额外的时间。现在让我们看看以更批量的方式进行较小的计算时会发生什么:

scala> time{a ! Cont(10000, -10); me.receive(30 seconds)}
code took 462ms

这与上述直接变体的结果非常接近。

结论

对于几乎所有应用程序来说,向自己发送消息并不昂贵,只需保持实际处理步骤略大于几百纳秒。

于 2012-10-13T04:10:55.193 回答
4

我从您对 Roland Kuhn 的回答中假设您有一些工作可以被认为是递归的,至少在块中。如果不是这种情况,我认为没有任何干净的解决方案可以解决您的问题,您将不得不处理复杂的模式匹配块。

如果我的假设是正确的,我将异步安排计算并让参与者可以自由地回答其他消息。关键是使用 Future monadic 功能并拥有一个简单的接收块。您必须处理三个消息(startComputation、changeState、getState)

您最终将收到以下信息:

def receive {
  case StartComputation(myData) =>expensiveStuff(myData)
  case ChangeState(newstate) = this.state = newstate
  case GetState => sender ! this.state
}

然后您可以通过定义自己的递归映射来利用 Future 上的 map 方法:

 def mapRecursive[A](f:Future[A], handler: A => A, exitConditions: A => Boolean):Future[A] = {
    f.flatMap {  a=>
                 if (exitConditions(a))
                   f
                 else {
                     val newFuture = f.flatMap{ a=> Future(handler(a))}
                     mapRecursive(newFuture,handler,exitConditions)
                 }

              }
  }

一旦你有了这个工具,一切都会变得更容易。如果您查看以下示例:

def main(args:Array[String]){
    val baseFuture:Future[Int] = Promise.successful(64)
    val newFuture:Future[Int] = mapRecursive(baseFuture,
                                 (a:Int) => {
                                   val result = a/2
                                   println("Additional step done: the current a is " + result)
                                   result
                                 }, (a:Int) => (a<=1))

    val one = Await.result(newFuture,Duration.Inf)
    println("Computation finished, result = " + one)



  }

它的输出是:

完成的附加步骤:当前 a 为 32

完成的附加步骤:当前 a 为 16

完成的附加步骤:当前 a 为 8

附加步骤完成:当前 a 为 4

完成的附加步骤:当前 a 为 2

完成的附加步骤:当前 a 为 1

计算完成,结果 = 1

你明白你可以在你的expensiveStuff方法中做同样的事情

  def expensiveStuff(myData:MyData):Future[MyData]= {
    val firstResult = Promise.successful(myData)
    val handler : MyData => MyData = (myData) => {
      val result = myData.copy(myData.value/2)
      self ! ChangeState(result)
      result
    }
    val exitCondition : MyData => Boolean = (myData:MyData) => myData.value==1
    mapRecursive(firstResult,handler,exitCondition)
  }

编辑 - 更详细

如果您不想阻塞以线程安全和同步方式处理来自其邮箱的消息的 Actor,您唯一能做的就是让计算在不同的线程上执行。这正是一个高性能的非阻塞接收。

但是,您说得对,我提出的方法会付出很高的性能代价。每一步都是在不同的未来完成的,这可能根本没有必要。因此,您可以递归处理程序以获得单线程或多线程执行。毕竟没有神奇的公式:

  • 如果你想异步调度并最小化成本,所有工作都应该由单个线程完成
  • 然而,这可能会阻止其他工作开始,因为如果线程池中的所有线程都被占用,则期货将排队。因此,您可能希望将操作分解为多个未来,以便即使在完全使用的情况下,也可以在旧工作完成之前安排一些新工作。

def recurseFuture[A](entryFuture: Future[A], handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long = Long.MaxValue): Future[A] = {
        def recurse(a:A, handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long, currentStep: Long): Future[A] = {
          if (exitCondition(a))
            Promise.successful(a)
          else
            if (currentStep==maxNestedRecursion)
              Promise.successful(handler(a)).flatMap(a => recurse(a,handler,exitCondition,maxNestedRecursion,0))
            else{
              recurse(handler(a),handler,exitCondition,maxNestedRecursion,currentStep+1)
            }
        }
        entryFuture.flatMap { a => recurse(a,handler,exitCondition,maxNestedRecursion,0) }
      }

为了测试目的,我增强了我的处理程序方法:

  val handler: Int => Int = (a: Int) => {
      val result = a / 2
      println("Additional step done: the current a is " + result + " on thread " + Thread.currentThread().getName)
      result
    }

方法 1:在自身上递归处理程序,以便在单个线程上执行所有操作。

    println("Starting strategy with all the steps on the same thread")
    val deepestRecursion: Future[Int] = recurseFuture(baseFuture,handler, exitCondition)
    Await.result(deepestRecursion, Duration.Inf)
    println("Completed strategy with all the steps on the same thread")
    println("")

方法 2:在有限的深度上递归处理程序本身

println("Starting strategy with the steps grouped by three")
val threeStepsInSameFuture: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,3)
val threeStepsInSameFuture2: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,4)
Await.result(threeStepsInSameFuture, Duration.Inf)
Await.result(threeStepsInSameFuture2, Duration.Inf)
println("Completed strategy with all the steps grouped by three")
executorService.shutdown()
于 2012-10-15T12:23:24.650 回答
2

您不应该使用 Actor 进行长时间运行的计算,因为这些会阻塞应该运行 Actor 代码的线程。

我会尝试采用使用单独的线程/线程池进行计算的设计,并使用 AtomicReferences 来存储/查询以下伪代码行中的中间结果:

val cancelled = new AtomicBoolean(false)
val intermediateResult = new AtomicReference[IntermediateResult]()

object WorkerThread extends Thread {
  override def run {
    while(!cancelled.get) {
      intermediateResult.set(computationStep(intermediateResult.get))
    }
  }
}

loop {
  react {
    case StartComputation => WorkerThread.start()
    case CancelComputation => cancelled.set(true)
    case GetCurrentResult => sender ! intermediateResult.get
  }
}
于 2012-10-12T10:32:01.820 回答
1

这是一个经典的并发问题。你想要几个例程/演员(或任何你想称呼他们的名字)。代码大多是正确的 Go,上下文变量名太长了。第一个例程处理查询和中间结果:

func serveIntermediateResults(
        computationChannel chan *IntermediateResult,
        queryChannel chan chan<-*IntermediateResult) {
    var latestIntermediateResult *IntermediateResult // initial result
    for {
        select {
        // an update arrives
        case latestIntermediateResult, notClosed := <-computationChannel:
            if !notClosed {
                // the computation has finished, stop checking
                computationChannel = nil
            }
        // a query arrived
        case queryResponseChannel, notClosed := <-queryChannel:
            if !notClosed {
                // no more queries, so we're done
                return
            }
            // respond with the latest result
            queryResponseChannel<-latestIntermediateResult
        }
    }
}

在您的长计算中,您可以在适当的地方更新您的中间结果:

func longComputation(intermediateResultChannel chan *IntermediateResult) {
    for notFinished {
        // lots of stuff
        intermediateResultChannel<-currentResult
    }
    close(intermediateResultChannel)
}

最后要询问当前结果,您有一个包装器可以使它变得更好:

func getCurrentResult() *IntermediateResult {
     responseChannel := make(chan *IntermediateResult)
     // queryChannel was given to the intermediate result server routine earlier
     queryChannel<-responseChannel
     return <-responseChannel
}
于 2012-10-12T05:32:18.093 回答