6

使用 Scala 2.8 RC1 或更新版本,什么是最好(最简单和/或最直接)的方法来“偷看”演员邮箱中的等待消息(从同一演员的 act() 方法中)以检查里面的内容队列,而不必以任何方式响应/接收消息和/或干扰邮箱的当前内容。

这样做的目的是让参与者可以通过首先确定是否有任何剩余的邮箱消息是必须处理的消息来确定处理退出请求是否安全,而不是通过立即停止参与者来丢弃。

4

2 回答 2

10

你不需要往前看。只需跟踪已请求退出的事实并使用 reactWithin(0) 来确定在请求退出后队列何时为空。

import scala.actors._

sealed case class Message
case object Exit extends Message
case class Unimportant(n:Int) extends Message
case class Important(n:Int) extends Message

class SafeExitingActor extends Actor {
  def act : Nothing = react {
      case Exit => {
           println("exit requested, clearing the queue")
           exitRequested
      }
      case message => {
           processMessage(message, false)
           act
      }
  }

  // reactWithin(0) gives a TIMEOUT as soon as the mailbox is empty
  def exitRequested : Nothing = reactWithin(0) {
     case Exit => {
         println("extra exit requested, ignoring")
         exitRequested // already know about the exit, keep processing
     }
     case TIMEOUT => {
         println("timeout, queue is empty, shutting down")
         exit // TIMEOUT so nothing more to process, we can shut down
     }
     case message => {
         processMessage(message, true)
         exitRequested
     }
  }

  // process is a separate method to avoid duplicating in act and exitRequested
  def processMessage(message : Any, importantOnly : Boolean) = {
     message match {
       case Unimportant(n) if !importantOnly => println("Unimportant " + n)
       case Unimportant(n) => () // do nothing
       case Important(n) => println("Important! " + n)
     }
     Thread sleep 100 // sleep a little to ensure mailbox backlog
  }
}

object TestProcessing {
  def main(args : Array[String]) {
    val actor = new SafeExitingActor()
    actor.start()
    for (i <- 1 to 10) {
        actor ! Unimportant(i)
        actor ! Important(i)
    }
    actor ! Exit
    for (i <- 11 to 20) {
        actor ! Unimportant(i)
        actor ! Important(i)
    }
    actor ! Exit
    actor ! Important(100)
  }
}

那应该输出

Unimportant 1
Important! 1
Unimportant 2
Important! 2
Unimportant 3
Important! 3
Unimportant 4
Important! 4
Unimportant 5
Important! 5
Unimportant 6
Important! 6
Unimportant 7
Important! 7
Unimportant 8
Important! 8
Unimportant 9
Important! 9
Unimportant 10
Important! 10
exit requested, clearing the queue
Important! 11
Important! 12
Important! 13
Important! 14
Important! 15
Important! 16
Important! 17
Important! 18
Important! 19
Important! 20
extra exit requested, ignoring
Important! 100
timeout, queue is empty, shutting down
于 2010-06-18T22:51:01.040 回答
3

一般来说,这听起来像是一个危险的操作,因为如果有关键消息,处理它们的参与者可能会检查并找不到任何消息,但在退出之前可能会从其他线程获得另一个消息。

如果您确定这不会发生,并且您不需要大量令人难以置信的快速消息切换,我可能会编写一个守卫角色来计算并跟踪关键消息,否则只是将它们传递给另一个处理的演员。

如果您不想这样做,请记住内部细节应该更改,并且您可能必须查看 Actor、Reactor、MessageQueue 等的源代码才能获得所需的内容。现在,这样的事情应该可以工作(警告,未经测试):

package scala.actors
package myveryownstuff
trait CriticalActor extends Actor {
  def criticalAwaits(p: Any => Boolean) = {
    synchronized {
      drainSendBuffer(mailbox)
      mailbox.get(0)(p).isDefined
    }
  }
}

请注意,我们必须将扩展特征放在 scala.actors 包中,因为所有邮箱内部都被声明为 scala.actors 包的私有。(这是一个很好的警告,在弄乱内部之前你应该小心。)然后我们添加一个新方法,该方法接受一个可以测试关键消息的函数,并使用mailbox.get(n)返回第nth 条消息的内置方法查找它。一些谓词。

于 2010-04-27T14:51:10.520 回答