对我来说,关键是发现每个 (?) Scala 对象都隐含地是一个 Actor,因此您可以使用它Actor.react{ }
来阻止。这是我的源代码:
import scala.actors._
import scala.actors.Actor._
//Top-level class that wants to return the first-completed result from some long-running actors
class ConcurrentQuerier() {
//Synchronous function; perhaps fulfilling some legacy interface
def synchronousQuery : String = {
//Instantiate and start the monitoring Actor
val progressReporter = new ProgressReporter(self) //All (?) objects are Actors
progressReporter.start()
//Instantiate the long-running Actors, giving each a handle to the monitor
val lrfs = List (
new LongRunningFunction(0, 2000, progressReporter), new LongRunningFunction(1, 2500, progressReporter), new LongRunningFunction(3, 1500, progressReporter),
new LongRunningFunction(4, 1495, progressReporter), new LongRunningFunction(5, 1500, progressReporter), new LongRunningFunction(6, 5000, progressReporter) )
//Start 'em
lrfs.map{ lrf =>
lrf.start()
}
println("All actors started...")
val start = System.currentTimeMillis()
/*
This blocks until it receives a String in the Inbox.
Who sends the string? A: the progressReporter, which is monitoring the LongRunningFunctions
*/
val s = receive {
case s:String => s
}
println("Received " + s + " after " + (System.currentTimeMillis() - start) + " ms")
s
}
}
/*
An Actor that reacts to a message that is a tuple ("COMPLETED", someResult) and sends the
result to this Actor's owner. Not strictly necessary (the LongRunningFunctions could post
directly to the owner's mailbox), but I like the idea that monitoring is important enough
to deserve its own object
*/
class ProgressReporter(val owner : Actor) extends Actor {
def act() = {
println("progressReporter awaiting news...")
react {
case ("COMPLETED", s) =>
println("progressReporter received a completed signal " + s);
owner ! s
case s =>
println("Unexpected message: " + s ); act()
}
}
}
/*
Some long running function
*/
class LongRunningFunction(val id : Int, val timeout : Int, val supervisor : Actor) extends Actor {
def act() = {
//Do the long-running query
val s = longRunningQuery()
println(id.toString + " finished, sending results")
//Send the results back to the monitoring Actor (the progressReporter)
supervisor ! ("COMPLETED", s)
}
def longRunningQuery() : String = {
println("Starting Agent " + id + " with timeout " + timeout)
Thread.sleep(timeout)
"Query result from agent " + id
}
}
val cq = new ConcurrentQuerier()
//I don't think the Actor semantics guarantee that the result is absolutely, positively the first to have posted the "COMPLETED" message
println("Among the first to finish was : " + cq.synchronousQuery)
典型结果如下所示:
scala ActorsNoSpin.scala
progressReporter awaiting news...
All actors started...
Starting Agent 1 with timeout 2500
Starting Agent 5 with timeout 1500
Starting Agent 3 with timeout 1500
Starting Agent 4 with timeout 1495
Starting Agent 6 with timeout 5000
Starting Agent 0 with timeout 2000
4 finished, sending results
progressReporter received a completed signal Query result from agent 4
Received Query result from agent 4 after 1499 ms
Among the first to finish was : Query result from agent 4
5 finished, sending results
3 finished, sending results
0 finished, sending results
1 finished, sending results
6 finished, sending results