我熟悉 Erlang/Elixir,其中进程邮箱中的消息保留在邮箱中,直到它们匹配:
这些模式
Pattern
按时间顺序与邮箱中的第一条消息顺序匹配,然后是第二条,依此类推。如果匹配成功并且可选的保护序列GuardSeq
为真,Body
则评估对应的。匹配的消息被消费,即从邮箱中删除,而邮箱中的任何其他消息保持不变。
(http://erlang.org/doc/reference_manual/expressions.html#receive)
但是,对于 Akka Actor,不匹配的消息会从邮箱中删除。例如,在哲学家就餐模拟中实现分叉时,这很烦人:
import akka.actor._
object Fork {
def props(id: Int): Props = Props(new Fork(id))
final case class Take(philosopher: Int)
final case class Release(philosopher: Int)
final case class TookFork(fork: Int)
final case class ReleasedFork(fork: Int)
}
class Fork(val id: Int) extends Actor {
import Fork._
object Status extends Enumeration {
val FREE, TAKEN = Value
}
private var _status: Status.Value = Status.FREE
private var _held_by: Int = -1
def receive = {
case Take(philosopher) if _status == Status.FREE => {
println(s"\tPhilosopher $philosopher takes fork $id.")
take(philosopher)
sender() ! TookFork(id)
context.become(taken, false)
}
case Release(philosopher) if _status == Status.TAKEN && _held_by == philosopher => {
println(s"\tPhilosopher $philosopher puts down fork $id.")
release()
sender() ! ReleasedFork(id)
context.unbecome()
}
}
def take(philosopher: Int) = {
_status = Status.TAKEN
_held_by = philosopher
}
def release() = {
_status = Status.FREE
_held_by = -1
}
}
当一条Take(<philosopher>)
消息被发送到分叉时,我们希望消息一直留在邮箱中,直到分叉被释放并且消息被匹配。但是,在 AkkaTake(<philosopher>)
中,如果当前采用分叉,则从邮箱中删除消息,因为没有匹配项。
目前,我通过重写unhandled
Fork actor 的方法并将消息再次转发到 fork 来解决这个问题:
override def unhandled(message: Any): Unit = {
self forward message
}
我相信这是非常低效的,因为它会一直将消息发送到分叉,直到匹配为止。有没有另一种方法来解决这个问题,不涉及不断转发不匹配的消息?
我相信在最坏的情况下,我将不得不实现一个模仿 Erlang 邮箱的自定义邮箱类型,如下所述:http: //ndpar.blogspot.com/2010/11/erlang-explained-selective-receive.html
编辑:我根据 Tim 的建议修改了我的实现,并按照建议使用了 Stash 特征。我的Fork
演员现在看起来如下:
class Fork(val id: Int) extends Actor with Stash {
import Fork._
// Fork is in "taken" state
def taken(philosopher: Int): Receive = {
case Release(`philosopher`) => {
println(s"\tPhilosopher $philosopher puts down fork $id.")
sender() ! ReleasedFork(id)
unstashAll()
context.unbecome()
}
case Take(_) => stash()
}
// Fork is in "free" state
def receive = {
case Take(philosopher) => {
println(s"\tPhilosopher $philosopher takes fork $id.")
sender() ! TookFork(id)
context.become(taken(philosopher), false)
}
}
}
但是,我不想到处写stash()
andunstashAll()
调用。相反,我想实现一个自定义邮箱类型来为我执行此操作,即存储未处理的消息并在参与者处理消息时取消存储它们。这可能吗?
我尝试实现一个自定义邮箱来执行此操作,但是,我无法确定消息是否与接收块匹配。