这似乎是另一个问题的重复。所以我会重复我的答案
Actor 一次处理一条消息。处理多条消息的经典模式是在一组消费者参与者的前面设置一个协调参与者。如果你使用 react,那么消费者池可能会很大,但仍然只会使用少量的 JVM 线程。这是一个示例,我创建了一个由 10 个消费者组成的池,并为他们创建了一个协调员。
import scala.actors.Actor
import scala.actors.Actor._
case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop
def consumer(n : Int) = actor {
loop {
react {
case Ready(sender) =>
sender ! Ready(self)
case Request(sender, payload) =>
println("request to consumer " + n + " with " + payload)
// some silly computation so the process takes awhile
val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
sender ! Result(result)
println("consumer " + n + " is done processing " + result )
case Stop => exit
}
}
}
// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)
val coordinator = actor {
loop {
react {
case msg @ Request(sender, payload) =>
consumers foreach {_ ! Ready(self)}
react {
// send the request to the first available consumer
case Ready(consumer) => consumer ! msg
}
case Stop =>
consumers foreach {_ ! Stop}
exit
}
}
}
// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)
此代码测试以查看哪个消费者可用并向该消费者发送请求。替代方案是随机分配给消费者或使用循环调度程序。
根据您所做的事情,Scala 的 Futures 可能会为您提供更好的服务。例如,如果你真的不需要演员,那么上述所有机器都可以写成
import scala.actors.Futures._
def transform(payload : String) = {
val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
println("transformed " + payload + " to " + result )
result
}
val results = for (i <- 0 to 1000) yield future(transform(i.toString))