4

我有适量的长时间运行的 Actor,我希望编写一个同步函数来返回其中第一个完成的。我可以通过对期货的旋转等待来做到这一点(例如:

while (! fs.exists(f => f.isSet) ) {
  Thread.sleep(100)
}
val completeds = fs.filter(f => f.isSet)
completeds.head()

),但这似乎非常“非演员”

该类scala.actors.Futures有两种方法 awaitAll()awaitEither() 看起来非常接近;如果有一个awaitAny()我会跳上它。我是否缺少一种简单的方法来做到这一点,或者是否有适用的通用模式?

4

2 回答 2

2

等待完成的一种更“演员”的方式是创建一个负责处理完成结果的演员(让我们称之为ResultHandler

工人没有回复,而是以“即发即弃”的方式发送他们的答案ResultHandler。后者将在其他工作人员完成工作时继续处理结果。

于 2011-11-03T08:23:14.610 回答
0

对我来说,关键是发现每个 (?) Scala 对象都隐含地是一个 Actor,因此您可以使用它Actor.react{ }来阻止。这是我的源代码:

    import scala.actors._
    import scala.actors.Actor._

    //Top-level class that wants to return the first-completed result from some long-running actors
    class ConcurrentQuerier() {
        //Synchronous function; perhaps fulfilling some legacy interface
        def synchronousQuery : String = {
            //Instantiate and start the monitoring Actor
            val progressReporter = new ProgressReporter(self) //All (?) objects are Actors 
            progressReporter.start()

            //Instantiate the long-running Actors, giving each a handle to the monitor
            val lrfs = List ( 
                new LongRunningFunction(0, 2000, progressReporter), new LongRunningFunction(1, 2500, progressReporter), new LongRunningFunction(3, 1500, progressReporter), 
                new LongRunningFunction(4, 1495, progressReporter), new LongRunningFunction(5, 1500, progressReporter), new LongRunningFunction(6, 5000, progressReporter) ) 

            //Start 'em
            lrfs.map{ lrf => 
                lrf.start()
            }
            println("All actors started...")

            val start = System.currentTimeMillis()
            /* 
            This blocks until it receives a String in the Inbox.
            Who sends the string? A: the progressReporter, which is monitoring the LongRunningFunctions
            */ 
            val s = receive {
                  case s:String => s
            }
            println("Received " + s + " after " + (System.currentTimeMillis() - start) + " ms")
            s
        }
    }

    /* 
    An Actor that reacts to a message that is a tuple ("COMPLETED", someResult) and sends the
    result to this Actor's owner. Not strictly necessary (the LongRunningFunctions could post
    directly to the owner's mailbox), but I like the idea that monitoring is important enough
    to deserve its own object
*/
    class ProgressReporter(val owner : Actor) extends Actor {
        def act() = {
            println("progressReporter awaiting news...")
            react {
                case ("COMPLETED", s) => 
                    println("progressReporter received a completed signal " + s);
                    owner ! s
                case s => 
                    println("Unexpected message: " + s ); act()
            }
        }
    }

/*
    Some long running function
*/

    class LongRunningFunction(val id : Int, val timeout : Int, val supervisor : Actor) extends Actor {
        def act() = {
            //Do the long-running query
            val s = longRunningQuery()
            println(id.toString + " finished, sending results")
            //Send the results back to the monitoring Actor (the progressReporter)
            supervisor ! ("COMPLETED", s)
        }

        def longRunningQuery() : String = { 
            println("Starting Agent " + id + " with timeout " + timeout)
            Thread.sleep(timeout)
            "Query result from agent " + id
        }
    }


    val cq = new ConcurrentQuerier()
    //I don't think the Actor semantics guarantee that the result is absolutely, positively the first to have posted the "COMPLETED" message
    println("Among the first to finish was : " + cq.synchronousQuery)

典型结果如下所示:

scala ActorsNoSpin.scala 
progressReporter awaiting news...
All actors started...
Starting Agent 1 with timeout 2500
Starting Agent 5 with timeout 1500
Starting Agent 3 with timeout 1500
Starting Agent 4 with timeout 1495
Starting Agent 6 with timeout 5000
Starting Agent 0 with timeout 2000
4 finished, sending results
progressReporter received a completed signal Query result from agent 4
Received Query result from agent 4 after 1499 ms
Among the first to finish was : Query result from agent 4
5 finished, sending results
3 finished, sending results 
0 finished, sending results
1 finished, sending results
6 finished, sending results
于 2011-11-03T19:11:12.280 回答