如果演员的邮箱太满,我想从演员的邮箱中删除消息。例如,如果队列大小达到 1000 条消息,则应删除最旧的消息。
问问题
495 次
2 回答
6
您不能直接使用邮箱,但您可以在现有库之上实现消息过期模式。
随每条消息发送创建日期:
case class ExpirableMessage(msg: String, createdAt: Long)
用 扫描邮箱reactWithin(0)
,过滤掉过期邮件:
react{
case msg: ExpirableMessage =>
// handle the message
// clean the mailbox with nested react
reactWithin(0){
case ExpirableMessage(_, createdAt) if(currentTimeMillis - createdAt > INTERVAL) =>
case TIMEOUT =>
}
}
于 2011-05-20T11:36:35.657 回答
3
您还可以在堆上具体化参与者的队列,并通过使用代理参与者来限制其利用率。然后你可以写如下内容:
// adder actor with a bounded queue size of 4
val adder = boundActor(4) {
loop {
react {
case x: Int => reply(x*2)
}
}
}
// test the adder
actor {
for (i <- 1 to 10) {
adder !! (i, { case answer: Int => println("Computed " + i + " -> " + answer) })
}
}
这是boundedActor的实现。请注意,boundedActor 必须始终回复其发送者,否则无法跟踪其队列大小,并且 boundedActor 将冻结拒绝接受任何进一步的消息。
object ActorProxy extends scala.App {
import scala.actors._
import scala.actors.Actor._
import scala.collection.mutable._
/**
* Accepts an actor and a message queue size, and
* returns a proxy that drops messages if the queue
* size of the target actor exceeds the given queue size.
*/
def boundActorQueue(target: Actor, maxQueueLength: Int) = actor {
val queue = new Queue[Tuple2[Any, OutputChannel[Any]]]
var lastMessageSender: Option[OutputChannel[Any]] = None
def replyHandler(response: Any) {
if (lastMessageSender.get != null) lastMessageSender.get ! response
if (queue.isEmpty) {
lastMessageSender = None
} else {
val (message, messageSender) = queue.dequeue
forwardMessage(message, messageSender)
}
}
def forwardMessage(message: Any, messageSender: OutputChannel[Any]) = {
lastMessageSender = Some(messageSender)
target !! (message, { case response => replyHandler(response) })
}
loop {
react {
case message =>
if (lastMessageSender == None) {
forwardMessage(message, sender)
} else {
queue.enqueue((message, sender))
// Restrict the queue size
if (queue.length > maxQueueLength) {
val dropped = queue.dequeue
println("!!!!!!!! Dropped message " + dropped._1)
}
}
}
}
}
// Helper method
def boundActor(maxQueueLength: Int)(body: => Unit): Actor = boundActorQueue(actor(body), maxQueueLength)
}
于 2011-05-22T03:57:31.877 回答