0

I have built a proper actor hierarchy for my Scala/Java application that is relies mostly on fire-forget semantics. I am now faced with the need to pass unique mutable objects atomically between actors. In particular, I have three actors, A, B and C:

  A
 / \
B   C

B and C both have their own Map of unique Objects and do not know about each other. At some point in time, B will decide that it needs to get rid of object O. I am looking for a mechanism that allows for object O to be added to C's Map and removed from B's Map atomically

B does not decide that it is C that will receive object O: all it does at first is sending a disposal request to actor A. Actor A can allow or refuse the disposal request from B and from there introduce or not C to actor B for them to conclude the transaction autonomously and atomically.

EDIT

My original question is ill-labeled and confused. In my system messages are immutable: I send UUIDs between actors, not refs to objects. These UUIDs are the keys to a private per-actor map of mutable objects. The usage of objects held by both B and C in their private map are is meant to be mutually exclusive at any given point in time.

It is trivial for me to go one step further and make sure no mutable object is ever shared between B and C, that is, to make sure key K in B's map and key K in C's map point to different private mutable objects (say Ob and Oc) having the same UUID.

One goal is to avoid doing calculations on object Ob in B and Oc in C at the same time. This is not really a problem in itself (I don't mind wasting a few CPU cycles once in a while during a transition from actor B to actor C), but it becomes a problem since actors B and C report their simulation results to a 3rd party client that we can call D.

D does not know about the relationship between A, B and C, and it could therefore receive results from B and C about the same UUID, not being able to tell which one it should listen to. Due to the nature of the simulation, these results could be different and conflicting. Of course, actor B could stop simulating object Ob and send a message to actor C telling it to start the simulation on object Oc. That would prevent client D to receive messages from both B and C about the same object, but there would be a time frame during which this UUID could be absent from the simulation altogether. Perhaps this is not very important for my application but I still have to verify this. The ideal for me would be a synchronized switch of actor for a UUID.

4

3 回答 3

1

两阶段提交协议可用于进行原子更新。

B -> A tell ("initiate moving of O B->C") 

A -> B tell ("prepare remove O") //
A -> C tell ("prepare add O") // (A selects C)

C changes O to prepare-to-add state
C -> A tell ("ready to add O")

B changes O to prepared-to-remove state.
B -> A tell ("ready to remove O")

A waits for two "ready" messages and then:
A -> B, C tell ("commit")

if A receives timeout, then
A -> B, C tell ("rollback")

为了使该协议可靠地工作,您需要在 B、C 中实现撤消/重做日志。

于 2013-09-08T12:27:43.717 回答
1

我相信您可以使用 Transactors 完成您想做的事情。

具体来说,B 和 C 应该包括Transactor特征。B 应该实现coordinate这样的方法,当 A 向 C 发送一条消息介绍它并告诉它继续更新其状态时,B 通过调用邀请 C 参与事务include(C)。两个参与者都应该实现该atomically方法来处理对其内部状态的实际更新(即更新他们的地图)。

像这样的东西:

type Key = // your map key type
type Value = // your map value type

case class AtomicallyTransferObject(transferTo : ActorRef, key : Key, value : Value)

class B extends Actor with Transactor
{
   private val myMap = mutable.Map[Key, Value]

   def coordinate = {
      case AtomicallyTransferObject(transferTo, key, value) => include(transferTo)
   }

   def atomically = {implicit txn =>
      case AtomicallyTransferObject(_, key, value) => myMap -= (key, value)
   }
}

class C extends Actor with Transactor
{
   private val myMap = mutable.Map[Key, Value]

   def atomically = {implicit txn =>
      case AtomicallyTransferObject(self, key, value) => myMap += (key, value)
   }
}

参与者A 将向 B 发送AtomicallyTransferObject消息,这将导致 B 邀请 C 加入事务,然后两个参与者将自动处理该消息。

可以在此处找到有关 Transactor 的更多信息:http: //doc.akka.io/docs/akka/current/scala/transactors.html

请注意,对于非事务性消息,receive您应该实现,而不是实现normally,因为Transactortrait 提供了receive委托给normally和的实现atomically

于 2013-09-09T04:19:59.410 回答
1

ScalaSTM可以使协议的实现更容易。我不太确定,但以下内容可能会为如何实现目标提供一些提示:

class O { val owner = Ref(None:Option[ActorRef]) }

class A extends Actor {
  val listOfTransferedObject = mutable.ListBuffer()
  def move(O, to) {
    atomic { implicit txn =>
      val oldOwner = O.owner()
      O.owner() = self 
      listOfTransferedObject += O
      oldOwner.get ! NotifyRemoved(O) // you may prefer to use `ask` 
      O.owner() = to
      O.owner().get ! NotifyAdded(O) // you may prefer to use `ask` 
    }
  }
}

收到 B 和 C 后,您只需更新内部地图。

于 2013-09-08T12:41:09.640 回答