0

我有一个持久的actor,它可以接收一种类型的命令Persist(event),其中事件是一种类型trait Event(它有很多实现)。成功后,这会回复Persisted(event)发件人。

事件本身是可序列化的,因为这是我们存储在持久性存储中的数据,并且序列化是使用自定义序列化程序实现的,该序列化程序在内部使用从 google protobuf.proto文件生成的类。并且这个自定义序列化器被配置application.conf并绑定到 base trait Event。这已经很好了。

注意:的实现Event不是protobuf 生成的类它们是普通的 scala 类,它们也有一个等价的 protobuf,但它是通过绑定到基本 Event 类型的自定义序列化程序映射的。这是我的前辈为版本控制完成的(这可能不是必需的,因为这也可以使用普通的 protobuf 类 + 自定义序列化组合来处理,但这是另一回事),我不想更改那个 atm。

我们现在正在尝试为这个参与者实现集群分片,这也意味着我的命令(即PersistPersisted)也需要可序列化,因为它们可能会被转发到其他节点。

这是域模型:

sealed trait PersistenceCommand {
  def event: Event
}

final case class Persisted(event: Event) extends PersistenceCommand
final case class Persist(event: Event) extends PersistenceCommand

问题是,我没有看到一种使其可序列化的优雅方法。以下是我考虑过的选项

方法 1.Persist为和定义一个新的 proto 文件Persisted,但是我使用什么作为数据类型event呢?我没有找到一种方法来定义这样的东西:

  message Persist {
   "com.example.Event" event = 1 // this doesn't work
   }

这样我就可以使用现有的 Scala 特征Event作为数据类型。如果这可行,我想(虽然它很牵强)我可以将生成的代码(在编译这个 proto 文件之后)绑定到 akka 的谷歌 protobuf 的内置序列化程序,它可能会工作。上面的注释解释了为什么我不能oneof在我的 proto 文件中使用构造。

方法 2。这是我已经实现的并且有效(但我不喜欢它)

基本上,我为命令编写了一个新的序列化程序,并将部分命令的序列化和反序列化委托event给现有的序列化程序。

class PersistenceCommandSerializer extends SerializerWithStringManifest {
  val eventSerializer: ManifestAwareEventSerializer = new ManifestAwareEventSerializer()

  val PersistManifest   = Persist.getClass.getName
  val PersistedManifest = Persisted.getClass.getName
  val Separator         = "~"

  override def identifier: Int = 808653986

  override def manifest(o: AnyRef): String = o match {
    case Persist(event)   => s"$PersistManifest$Separator${eventSerializer.manifest(event)}"
    case Persisted(event) => s"$PersistedManifest$Separator${eventSerializer.manifest(event)}"
  }

  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case command: PersistenceCommand => eventSerializer.toBinary(command.event)
  }

  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
    val (commandManifest, dataManifest) = splitIntoCommandAndDataManifests(manifest)
    val event                           = eventSerializer.fromBinary(bytes, dataManifest).asInstanceOf[Event]
    commandManifest match {
      case PersistManifest =>
        Persist(event)
      case PersistedManifest =>
        Persisted(event)
    }
  }

  private def splitIntoCommandAndDataManifests(manifest: String) = {
    val commandAndDataManifests = manifest.split(Separator)
    (commandAndDataManifests(0), commandAndDataManifests(1))
  }
}

这种方法的问题是我def manifestdef fromBinary. 在序列化和反序列化时,我必须确保我拥有命令的清单以及事件的清单。因此,我不得不将其~用作分隔符——类似于我对清单信息的自定义序列化技术。

是否有更好的或正确的方法来实现这一点?

对于上下文:我使用 ScalaPB 从.proto文件和经典的 akka 演员生成 scala 类。

非常感谢任何形式的指导!

4

1 回答 1

1

如果您将嵌套对象的序列化委托给您为序列化数据配置的嵌套字段应该具有bytes的任何序列化程序,而且还具有int32使用的序列化程序的 id 和bytes消息清单。这确保您将能够版本/替换嵌套的序列化程序,这对于将存储更长时间的数据很重要。

您可以在此处查看 Akka 内部如何针对我们自己的有线格式完成此操作:https ://github.com/akka/akka/blob/6bf20f4117a8c27f8bd412228424caafe76a89eb/akka-remote/src/main/protobuf/WireFormats.proto#L48和此处https://github.com/akka/akka/blob/6bf20f4117a8c27f8bd412228424caafe76a89eb/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala#L45

于 2020-02-18T08:53:13.980 回答