0

我们正在尝试将 protobuf 与 Akka 一起使用,并通过 protobuf 序列化所有 Akka 消息。对于 Scala,我们有一个名为 ScalaPB 的库,它可以帮助我们生成类,其中包括用于序列化或反序列化数据的方法等parseFromtoByteArray但是,当我们尝试运行程序时,出现以下异常:

akka.actor.dungeon.SerializationCheckFailedException: Failed to serialize and deserialize message of type com.knoldus.akkaserialization.msg.example.Bang$ for testing. To avoid this error, either disable 'akka.actor.serialize-messages', mark the message with 'akka.actor.NoSerializationVerificationNeeded', or configure serialization to support this message

application.conf 文件包含以下配置:

akka {
    actor {
        allow-java-serialization = off
        serialize-messages = on
        serializers {
            proto = "akka.remote.serialization.ProtobufSerializer"
        }

        serialization-bindings {
            "com.knoldus.akkaserialization.msg.example.Bang" = proto
            "com.knoldus.akkaserialization.msg.example.Message" = proto
        }
    }
}

这些类通过 ScalaPB 生成并包含所有需要的方法com.knoldus.akkaserialization.msg.example.Bangcom.knoldus.akkaserialization.msg.example.Message

akka.remote.serialization.ProtobufSerializer定义源代码,

This Serializer serializes `akka.protobuf.Message` and `com.google.protobuf.Message` It is using reflection to find the `parseFrom` and `toByteArray` methods to avoid dependency to `com.google.protobuf`

所以,我们期望,这会自动读取我们的案例类BangMessage执行序列化,但不幸的是得到序列化异常。

你能帮忙弄清楚 ScalaPB 和 ProtoBuff 的确切问题吗?

4

1 回答 1

2

您使用的序列化程序旨在与 Java protobuf 一起使用,而不是与 ScalaPB 一起使用。您需要包含自己的序列化程序。这是您可以使用的一个:

package com.example.protoser

import java.util.concurrent.atomic.AtomicReference

import akka.actor.ExtendedActorSystem
import akka.serialization.BaseSerializer
import scalapb.GeneratedMessageCompanion

class ScalaPbSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
  private val classToCompanionMapRef = new AtomicReference[Map[Class[_], GeneratedMessageCompanion[_]]](Map.empty)

  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case e: scalapb.GeneratedMessage => e.toByteArray
    case _ => throw new IllegalArgumentException("Need a subclass of scalapb.GeneratedMessage")
  }

  override def includeManifest: Boolean = true

  override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
    manifest match {
      case Some(clazz) =>
        @scala.annotation.tailrec
        def messageCompanion(companion: GeneratedMessageCompanion[_] = null): GeneratedMessageCompanion[_] = {
          val classToCompanion = classToCompanionMapRef.get()
          classToCompanion.get(clazz) match {
            case Some(cachedCompanion) => cachedCompanion
            case None =>
              val uncachedCompanion =
                if (companion eq null) Class.forName(clazz.getName + "$", true, clazz.getClassLoader)
                  .getField("MODULE$").get().asInstanceOf[GeneratedMessageCompanion[_]]
                else companion
              if (classToCompanionMapRef.compareAndSet(classToCompanion, classToCompanion.updated(clazz, uncachedCompanion)))
                uncachedCompanion
              else
                messageCompanion(uncachedCompanion)
          }
        }
        messageCompanion().parseFrom(bytes).asInstanceOf[AnyRef]
      case _ => throw new IllegalArgumentException("Need a ScalaPB companion class to be able to deserialize.")
    }
  }
}

配置应该是这样的:

akka {
  actor {
    serializers {
      scalapb = "com.example.protoser.ScalaPbSerializer"
    }

    serialization-bindings {
      "scalapb.GeneratedMessage" = scalapb
    }

    serialization-identifiers {
      "com.example.protoser.ScalaPbSerializer" = 10000
    }
  }
}

以上是旧代码的适配器,欢迎编辑和建议!

于 2019-03-25T15:58:36.930 回答