4

我目前正在与 scala 中的两个演员一起工作。一,生产者,生产一些数据并将其发送到一个parcer。生产者HashMap[String,HashMap[Object,List[Int]]]通过一条消息发送一个(连同这个来标记发送者):

parcer ! (this,data)

解析器一直在等待这样的消息:

def act(){
    loop{
      react{
        case (producer, data)=> parse(data);
      }
    }
}

该程序在正常情况下完美运行。问题来自大量数据和发送的许多消息(散列大约有 10^4 个元素,内部散列大约有 100 个元素,列表长度为 100),程序崩溃。它没有显示错误或异常。它只是停止。

问题似乎是我的生产者比解析器工作得快得多(目前我不想要一个以上的解析器)。

阅读scala 邮箱大小限制后,我想知道我的解析器邮箱是否达到了它的限制。该帖子还提供了一些解决方案,但我首先需要确保这是问题所在。我该如何测试呢?

有没有办法知道演员的内存限制?如何读取邮箱中的已用/空闲内存?

也欢迎任何未在该链接中发布的工作流程建议 。

谢谢,

4

1 回答 1

4

首先,您不需要显式传递发送者,因为发送者无论如何都会被 Scala 演员框架跟踪。您始终可以使用 方法访问消息的发件人sender

从这里可以看出:scala.actors.MQueue,一个actor的邮箱被实现为一个链表,因此只受堆大小的限制。

不过,如果您担心生产者非常快而消费者非常慢,我建议您探索一种节流机制。但我不推荐从接受的问题scala mail size limit的答案中的方法。

通常,在系统压力很大时尝试发送过载消息似乎不是一个好主意。如果您的系统太忙而无法检查过载怎么办?如果过载消息的接收者太忙而无法对其采取行动怎么办?此外,删除消息对我来说听起来不是一个好主意。我认为您希望可靠地处理所有工作项。

另外,我不会依靠mailboxSize来确定负载。您无法区分不同的消息类型,您只能从消费者内部检查,而不是从生产者检查。

我建议使用一种方法,当消费者知道自己可以处理时,他会要求更多的工作。

下面是一个如何实现它的简单示例。

import scala.actors._
import Actor._

object ConsumerProducer {
  def main(args: Array[String]) {
    val producer = new Producer(Iterator.range(0, 10000))
    val consumer = new Consumer(producer)
  }
}

case class Produce(count: Int)
case object Finished

class Producer[T](source: Iterator[T]) extends Actor {

  start

  def act() {
    loopWhile(source.hasNext) {
      react {
        case Produce(n: Int) => produce(n)
      } 
    }
  }

  def produce(n: Int) {
    println("producing " + n)
    var remaining = n
    source takeWhile(_ => remaining > 0) foreach { x => sender ! x; remaining -= 1 }
    if(!source.hasNext) sender ! Finished
  }
}

class Consumer(producer: Actor) extends Actor {

  start

  private var remaining = 0

  def act() {
    requestWork()
    consume()
  }

  def consume(): Nothing = react {
    case Finished => println("Finished")
    case n: Int => work(n); requestWork(); consume()
  }

  def requestWork() = if(remaining < 5) { remaining += 10; producer ! Produce(10) }

  def work(n: Int) = {
    println(n + ": " + (0 until 10000).foldLeft(0) { (acc, x) => acc + x * n })
    remaining -= 1
  }
}
于 2010-07-03T11:07:05.273 回答