3

生产者演员可以将消息发布给另一个演员以立即处理吗?即向消费者邮箱的头部而不是消费者邮箱的尾部发布消息?

我知道 akka 提供了一种配置我自己定义的邮箱类型的方法,但是如何控制是否需要将某些类型的消息发布在邮箱的头部而不是尾部。例如TimerMessages。我想要一个时间窗口实现的精确计时器控制。消息必须仅保留 1000 毫秒(例如),并且如果消息处理消耗时间并且邮箱中有许多待处理的消息,我不希望将计时器消息附加到同一个队列中。

我可以使用 a PriorityMailBox,但问题PriorityMailBox是即使它可以将更高优先级的消息(计时器消息)放在 MailBox 的头部,对于相同优先级的消息,不能保证 MailBox 中的消息顺序与 order 相同的到来。所以我也不能使用priorityMailBox。

有人可以告诉我如何实现这种行为吗?

4

2 回答 2

4

您可以使用自己的PriorityMailBox来处理消息的到达时间并将其用作附加优先级(对于具有相同“主要”优先级的消息)。

像这样的东西(未经测试):

import akka.dispatch._
import com.typesafe.config.Config
import akka.actor.{ActorRef, PoisonPill, ActorSystem}
import java.util.Comparator
import java.util.concurrent.PriorityBlockingQueue

class MyTimedPriorityMailbox(settings: ActorSystem.Settings, config: Config)
  extends UnboundedTimedPriorityMailbox(
    TimedPriorityGenerator {
      case 'highpriority ⇒ 0

      case 'lowpriority  ⇒ 2

      case PoisonPill    ⇒ 3

      case otherwise     ⇒ 1
    })

case class TimedEnvelope(envelope: Envelope) {
  private val _timestamp = System.nanoTime()
  def timestamp = _timestamp
}

class UnboundedTimedPriorityMailbox( final val cmp: Comparator[TimedEnvelope], final val initialCapacity: Int) extends MailboxType {
  def this(cmp: Comparator[TimedEnvelope]) = this(cmp, 11)
  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new PriorityBlockingQueue[TimedEnvelope](initialCapacity, cmp) with TimedQueueBasedMessageQueue with TimedUnboundedMessageQueueSemantics {
      override def queue: java.util.Queue[TimedEnvelope] = this
    }
}

trait TimedQueueBasedMessageQueue extends MessageQueue {
  def queue: java.util.Queue[TimedEnvelope]
  def numberOfMessages = queue.size
  def hasMessages = !queue.isEmpty
  def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
    if (hasMessages) {
      var envelope = dequeue()
      while (envelope ne null) {
        deadLetters.enqueue(owner, envelope)
        envelope = dequeue()
      }
    }
  }
}

trait TimedUnboundedMessageQueueSemantics extends TimedQueueBasedMessageQueue {
  def enqueue(receiver: ActorRef, handle: Envelope) { queue add TimedEnvelope(handle) }
  def dequeue(): Envelope = Option(queue.poll()).map(_.envelope).getOrElse(null)
}


object TimedPriorityGenerator {
  def apply(priorityFunction: Any ⇒ Int): TimedPriorityGenerator = new TimedPriorityGenerator {
    def gen(message: Any): Int = priorityFunction(message)
  }
}


abstract class TimedPriorityGenerator extends java.util.Comparator[TimedEnvelope] {
  def gen(message: Any): Int

  final def compare(thisMessage: TimedEnvelope, thatMessage: TimedEnvelope): Int = {
    val result = gen(thisMessage.envelope.message) - gen(thatMessage.envelope.message)
    // Int.MaxValue / Int.MinValue check omitted
    if(result == 0) (thisMessage.timestamp - thatMessage.timestamp).toInt else result
  }

}
于 2013-03-11T12:22:17.297 回答
2

上面的代码工作正常。

只有一个细节。避免使用 System.getTimeNano()。它在多核机器中存在问题,因为它是由 per-cpu 逻辑定义的

这里是另一个帖子

然后,我们在消息顺序中有一个奇怪的行为取决于哪个 cpu enque it。

我用经典的 System.currentTimeMillis() 改变它。它不太精确,但在我们的例子中,如果两条消息具有相同的优先级和相同的毫秒生成时间,则不要关心它们被处理的顺序。

感谢您的代码!

于 2013-08-06T15:48:13.380 回答