我需要将不同类型的消息发布到事件流,这些消息应该有不同的优先级,例如,如果已经发布了 10 条 A 类型的消息,并且毕竟发布了一条 B 类型的消息,并且 B 的优先级高于A 的优先级 - 即使队列中有 10 条 A 类型的消息,消息 B 也应该由下一个参与者获取。
我在这里阅读了有关优先消息的信息,并创建了该邮箱的简单实现:
class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(
PriorityGenerator {
case ServerPermanentlyDead => println("Priority:0"); 0
case ServerDead => println("Priority:1"); 1
case _ => println("Default priority"); 10
}
)
然后我在 application.conf 中配置它
akka {
actor {
prio-dispatcher {
type = "Dispatcher"
mailbox-type = "mailbox.PrioritizedMailbox"
}
}
}
并连接到我的演员:
private val myActor = actors.actorOf(
Props[MyEventHandler[T]].
withRouter(RoundRobinRouter(HIVE)).
withDispatcher("akka.actor.prio-dispatcher").
withCreator(
new Creator[Actor] {
def create() = new MyEventHandler(storage)
}), name = "eventHandler")
我正在使用 ActorSystem.eventStream.publish 来发送消息,并且我的演员订阅了它(我可以在日志中看到消息已被处理,但按 FIFO 顺序)。
但是看起来这还不够,因为在日志/控制台中我从未见过像“默认优先级”这样的消息。我在这里错过了什么吗?所描述的方法是否适用于事件流或仅适用于在参与者上发送消息的直接调用?以及如何使用 eventStream 获取优先消息?