我从您对 Roland Kuhn 的回答中假设您有一些工作可以被认为是递归的,至少在块中。如果不是这种情况,我认为没有任何干净的解决方案可以解决您的问题,您将不得不处理复杂的模式匹配块。
如果我的假设是正确的,我将异步安排计算并让参与者可以自由地回答其他消息。关键是使用 Future monadic 功能并拥有一个简单的接收块。您必须处理三个消息(startComputation、changeState、getState)
您最终将收到以下信息:
def receive {
case StartComputation(myData) =>expensiveStuff(myData)
case ChangeState(newstate) = this.state = newstate
case GetState => sender ! this.state
}
然后您可以通过定义自己的递归映射来利用 Future 上的 map 方法:
def mapRecursive[A](f:Future[A], handler: A => A, exitConditions: A => Boolean):Future[A] = {
f.flatMap { a=>
if (exitConditions(a))
f
else {
val newFuture = f.flatMap{ a=> Future(handler(a))}
mapRecursive(newFuture,handler,exitConditions)
}
}
}
一旦你有了这个工具,一切都会变得更容易。如果您查看以下示例:
def main(args:Array[String]){
val baseFuture:Future[Int] = Promise.successful(64)
val newFuture:Future[Int] = mapRecursive(baseFuture,
(a:Int) => {
val result = a/2
println("Additional step done: the current a is " + result)
result
}, (a:Int) => (a<=1))
val one = Await.result(newFuture,Duration.Inf)
println("Computation finished, result = " + one)
}
它的输出是:
完成的附加步骤:当前 a 为 32
完成的附加步骤:当前 a 为 16
完成的附加步骤:当前 a 为 8
附加步骤完成:当前 a 为 4
完成的附加步骤:当前 a 为 2
完成的附加步骤:当前 a 为 1
计算完成,结果 = 1
你明白你可以在你的expensiveStuff
方法中做同样的事情
def expensiveStuff(myData:MyData):Future[MyData]= {
val firstResult = Promise.successful(myData)
val handler : MyData => MyData = (myData) => {
val result = myData.copy(myData.value/2)
self ! ChangeState(result)
result
}
val exitCondition : MyData => Boolean = (myData:MyData) => myData.value==1
mapRecursive(firstResult,handler,exitCondition)
}
编辑 - 更详细
如果您不想阻塞以线程安全和同步方式处理来自其邮箱的消息的 Actor,您唯一能做的就是让计算在不同的线程上执行。这正是一个高性能的非阻塞接收。
但是,您说得对,我提出的方法会付出很高的性能代价。每一步都是在不同的未来完成的,这可能根本没有必要。因此,您可以递归处理程序以获得单线程或多线程执行。毕竟没有神奇的公式:
- 如果你想异步调度并最小化成本,所有工作都应该由单个线程完成
- 然而,这可能会阻止其他工作开始,因为如果线程池中的所有线程都被占用,则期货将排队。因此,您可能希望将操作分解为多个未来,以便即使在完全使用的情况下,也可以在旧工作完成之前安排一些新工作。
def recurseFuture[A](entryFuture: Future[A], handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long = Long.MaxValue): Future[A] = {
def recurse(a:A, handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long, currentStep: Long): Future[A] = {
if (exitCondition(a))
Promise.successful(a)
else
if (currentStep==maxNestedRecursion)
Promise.successful(handler(a)).flatMap(a => recurse(a,handler,exitCondition,maxNestedRecursion,0))
else{
recurse(handler(a),handler,exitCondition,maxNestedRecursion,currentStep+1)
}
}
entryFuture.flatMap { a => recurse(a,handler,exitCondition,maxNestedRecursion,0) }
}
为了测试目的,我增强了我的处理程序方法:
val handler: Int => Int = (a: Int) => {
val result = a / 2
println("Additional step done: the current a is " + result + " on thread " + Thread.currentThread().getName)
result
}
方法 1:在自身上递归处理程序,以便在单个线程上执行所有操作。
println("Starting strategy with all the steps on the same thread")
val deepestRecursion: Future[Int] = recurseFuture(baseFuture,handler, exitCondition)
Await.result(deepestRecursion, Duration.Inf)
println("Completed strategy with all the steps on the same thread")
println("")
方法 2:在有限的深度上递归处理程序本身
println("Starting strategy with the steps grouped by three")
val threeStepsInSameFuture: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,3)
val threeStepsInSameFuture2: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,4)
Await.result(threeStepsInSameFuture, Duration.Inf)
Await.result(threeStepsInSameFuture2, Duration.Inf)
println("Completed strategy with all the steps grouped by three")
executorService.shutdown()