我需要在给定的毫秒内尽可能多地执行相同的任务。所以我使用 scala.actors.threadpool.Executors 作为以下代码。
import actors.threadpool.{TimeUnit, Future, Executors}
import collection.mutable.ListBuffer
object TaskRepeater {
def repeat(task: Runnable, millisecs: Long): Int = {
val exector = Executors.newCachedThreadPool
val remover = Executors.newCachedThreadPool
val queue = ListBuffer[Future]()
def removeDone() {
(queue filter (_.isDone)) foreach { f =>
queue.remove(queue indexOf f)
}
}
val lock = new EasyLock()
val maxQueueSize = scala.collection.parallel.availableProcessors
val queueAvaiable = lock.mkCondition(queue.size < maxQueueSize)
var cnt = 0
val start = System.nanoTime()
while(System.nanoTime() - start < millisecs * 1000000) {
lock {
queueAvaiable.waitUntilFulfiled()
cnt += 1
val r = exector.submit(task)
queue += r
remover.submit(runneble {
r.get()
lock {
removeDone()
queueAvaiable.signalIfFulfilled()
}
})
}
}
exector.shutdown()
remover.shutdown()
assert(exector.awaitTermination(1, TimeUnit.SECONDS))
cnt
}
def runneble(f: => Unit) = new Runnable {
def run() {
f
}
}
}
import actors.threadpool.locks.{Condition, ReentrantLock}
class EasyLock {
val lock = new ReentrantLock
def mkCondition(f: => Boolean): EasyCondition = {
new EasyCondition(lock.newCondition(), f)
}
def apply(f: => Unit) {
lock.lock()
try {
f
} finally {
lock.unlock()
}
}
}
class EasyCondition(c: Condition, condition: => Boolean) {
def waitUntilFulfiled(f: => Unit) {
while (!condition) {
c.await()
}
f
}
def signalAllIfFulfilled() {
if (condition) c.signalAll()
}
def signalIfFulfilled() {
if (condition) c.signal()
}
}
但这有点复杂。相反,我认为 Twitter 的 Broker 和一些与 Broker 连接并将完成的任务发送给 Broker(如果存在)的 Executor 可以使其更容易。
以下是伪代码
val b = new Broker
val e = new Executor // connects with the broker anyway
e.runTask
val o = b.recv
o.sync() // wait until the task finishes
有没有这样的执行者?