首先,您不需要显式传递发送者,因为发送者无论如何都会被 Scala 演员框架跟踪。您始终可以使用 方法访问消息的发件人sender
。
从这里可以看出:scala.actors.MQueue,一个actor的邮箱被实现为一个链表,因此只受堆大小的限制。
不过,如果您担心生产者非常快而消费者非常慢,我建议您探索一种节流机制。但我不推荐从接受的问题scala mail size limit的答案中的方法。
通常,在系统压力很大时尝试发送过载消息似乎不是一个好主意。如果您的系统太忙而无法检查过载怎么办?如果过载消息的接收者太忙而无法对其采取行动怎么办?此外,删除消息对我来说听起来不是一个好主意。我认为您希望可靠地处理所有工作项。
另外,我不会依靠mailboxSize
来确定负载。您无法区分不同的消息类型,您只能从消费者内部检查,而不是从生产者检查。
我建议使用一种方法,当消费者知道自己可以处理时,他会要求更多的工作。
下面是一个如何实现它的简单示例。
import scala.actors._
import Actor._
object ConsumerProducer {
def main(args: Array[String]) {
val producer = new Producer(Iterator.range(0, 10000))
val consumer = new Consumer(producer)
}
}
case class Produce(count: Int)
case object Finished
class Producer[T](source: Iterator[T]) extends Actor {
start
def act() {
loopWhile(source.hasNext) {
react {
case Produce(n: Int) => produce(n)
}
}
}
def produce(n: Int) {
println("producing " + n)
var remaining = n
source takeWhile(_ => remaining > 0) foreach { x => sender ! x; remaining -= 1 }
if(!source.hasNext) sender ! Finished
}
}
class Consumer(producer: Actor) extends Actor {
start
private var remaining = 0
def act() {
requestWork()
consume()
}
def consume(): Nothing = react {
case Finished => println("Finished")
case n: Int => work(n); requestWork(); consume()
}
def requestWork() = if(remaining < 5) { remaining += 10; producer ! Produce(10) }
def work(n: Int) = {
println(n + ": " + (0 until 10000).foldLeft(0) { (acc, x) => acc + x * n })
remaining -= 1
}
}