0

我正在研究一种分布式算法,并决定使用 Akka 跨机器扩展它。机器需要非常频繁地交换消息,这些消息引用了每台机器上存在的一些不可变对象。因此,从共享的复制对象不应在消息中序列化的意义上“压缩”消息似乎是明智的。这不仅可以节省网络带宽,而且还可以避免在反序列化消息时在接收方创建重复的对象。

现在,我的问题是如何正确地做到这一点。到目前为止,我可以想到两个选择:

  1. 在“业务层”上处理这个问题,即将我的原始消息对象转换为一些引用对象,这些引用对象用一些符号引用替换对共享、复制对象的引用。然后,我会发送那些引用对象而不是原始消息。将其视为用 URL 替换一些实际的 Web 资源。这样做在编码方面似乎相当直接,但它也会将序列化问题拖入实际的业务逻辑中。

  2. 编写了解共享的复制对象的自定义序列化程序。就我而言,这个解决方案可以通过序列化器将复制的共享对象作为全局状态引入参与者系统。但是,Akka 文档没有描述如何以编程方式添加自定义序列化程序,这是使用序列化程序编织共享对象所必需的。另外,我可以想象有几个原因,为什么不鼓励这种解决方案。所以,我在这里问。

非常感谢!

4

2 回答 2

1

可以编写自己的自定义序列化程序并让它们做各种奇怪的事情,然后你可以像往常一样在配置级别绑定它们:

class MyOwnSerializer extends Serializer {

  // If you need logging here, introduce a constructor that takes an ExtendedActorSystem.
  // class MyOwnSerializer(actorSystem: ExtendedActorSystem) extends Serializer
  // Get a logger using:
  // private val logger = Logging(actorSystem, this)

  // This is whether "fromBinary" requires a "clazz" or not
  def includeManifest: Boolean = true

  // Pick a unique identifier for your Serializer,
  // you've got a couple of billions to choose from,
  // 0 - 40 is reserved by Akka itself
  def identifier = 1234567

  // "toBinary" serializes the given object to an Array of Bytes
  def toBinary(obj: AnyRef): Array[Byte] = {
    // Put the code that serializes the object here
    //#...
    Array[Byte]()
    //#...
  }

  // "fromBinary" deserializes the given array,
  // using the type hint (if any, see "includeManifest" above)
  def fromBinary(
    bytes: Array[Byte],
    clazz: Option[Class[_]]): AnyRef = {
    // Put your code that deserializes here
    //#...
    null
    //#...
  }
}

但这提出了一个重要的问题:如果您的消息都引用了已经在机器上共享的数据,为什么要在消息中放入指向对象的指针(非常糟糕!消息应该是不可变的,而指针不是!),而不是某种不可变的字符串objectId(有点你的选择1)?在保持消息的不变性方面,这是一个更好的选择,并且您的业务逻辑几乎没有变化(只需在共享状态存储上放置一个包装器)

有关更多信息,请参阅文档

于 2017-06-21T11:39:40.830 回答
0

我终于接受了Diego提出的解决方案,并想分享一些关于我的推理和解决方案的更多细节。

首先,出于以下原因,我也赞成选项 1(处理业务层中消息的“压缩”):

  1. 序列化器对于actor系统是全局的。使它们有状态实际上是对 Akka 哲学的最严重的违反,因为它违背了对行为者的行为和状态的封装。
  2. 无论如何,必须预先创建序列化程序(即使“以编程方式”添加它们)。
  3. 在设计方面,人们可以争辩说“消息压缩也不是序列化程序的责任。从严格意义上讲,序列化只是将运行时特定的数据转换为紧凑的、可交换的表示。改变要序列化的内容并不是不过,序列化程序的任务。

确定了这一点后,我仍然努力将“消息压缩”与参与者中的实际业务逻辑明确分离。我想出了一个在 Scala 中执行此操作的巧妙方法,我想在这里分享。基本思想是使消息本身看起来像一个普通的案例类,但仍然允许这些消息“压缩”自己。这是一个抽象的例子:

class Sender extends ActorRef {
   def context: SharedContext = ... // This is the shared data present on every node.

   // ...

   def someBusinessLogic(receiver: ActorRef) {
     val someData = computeData
     receiver ! MyMessage(someData)
   }
}

class Receiver extends ActorRef {
   implicit def context: SharedContext = ... // This is the shared data present on every node.

   def receiver = {
     case MyMessage(someData) =>
       // ...
   }
}

object Receiver {
  object MyMessage {
    def apply(someData: SomeData) = MyCompactMessage(someData: SomeData)
    def unapply(myCompactMessage: MyCompactMessage)(implicit context: SharedContext)
    : Option[SomeData] =
      Some(myCompactMessage.someData(context))
  }
}

如您所见,发送方和接收方代码感觉就像使用案例类,实际上MyMessage可能是案例类。但是,通过手动实现applyunapply可以插入自己的“压缩”逻辑,也可以隐式注入执行“解压缩”所需的共享数据,而无需触及发送方和接收方。对于定义MyCompactMessage,我发现 Protocol Buffers 特别适合,因为它已经是 Akka 的依赖项,并且在空间和计算方面非常高效,但任何其他解决方案都可以。

于 2017-06-23T07:32:54.723 回答