我正在尝试使用标准的 scala.actors 包为 Scala 设计一个调度员-工作者演员模式。
调度程序从 a 接收工作java.util.concurrent.LinkedBlockingQueue
并将其发送给工作参与者进行处理。完成所有工作后,调度员应该告诉每个工人退出,然后它也应该退出。这是我想出的代码,但是当所有工作完成后它就会挂起(我认为'GiveMeWork
调度程序的队列中有待处理的消息):
import java.util.concurrent.LinkedBlockingQueue
import scala.actors.Actor
object Dispatcher
extends Actor {
println("Dispatcher created")
def act() {
val workers = (1 to 4).map(id => (new Worker(id)).start())
loop {
react {
case 'GiveMeWork =>
// println("Worker asked for work")
val (time, i) = workQueue.take()
if (time == 0) {
println("Quitting time")
workers.foreach(_ !? 0L)
} else {
println("Arrival at dispatcher: i: " + i + " dispatch time: " +
time + ", elapsed: " + (System.nanoTime() - time))
sender ! time
}
case 'Quit =>
println("Told to quit")
sender ! 'OffDuty
exit()
}
}
}
}
class Worker(id: Int)
extends Actor {
println("Worker(" + id + ") created")
var jobs = 0
def act() {
Dispatcher ! 'GiveMeWork
loop {
react {
case time: Long =>
if (time == 0) {
println("Worker(" + id + ") completed " + jobs + " jobs")
sender ! 'OffDuty
exit()
} else {
println("Arrival at worker(" + id + "): dispatch time: " +
time + ", elapsed: " + (System.nanoTime() - time))
Thread.sleep(id)
jobs += 1
Dispatcher ! 'GiveMeWork
}
}
}
}
}
val workQueue = new LinkedBlockingQueue[(Long, Int)](1000)
Dispatcher.start()
for (i <- 0 until 5000) {
Thread.sleep(1)
workQueue.put((System.nanoTime(), i))
}
workQueue.put((0L, 0))
println("Telling Dispatcher to quit")
Dispatcher !? 'Quit