6

我需要编写一个读取 DataSet[Row] 并将其转换为 DataSet[CustomClass] 的作业,其中 CustomClass 是一个 protobuf 类。

val protoEncoder = Encoders.bean(classOf[CustomClass])
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}(protoEncoder)

但是,看起来 Protobuf 类并不是真正的 Java Bean,我确实在以下方面获得了 NPE

val x =  Encoders.bean(classOf[CustomClass])

如何确保作业可以发出 DataSet[CustomClass] 类型的数据集,其中 CustomClass 是 protobuf 类。有关为该类编写自定义编码器的任何指针/示例?

NPE:

val encoder2 = Encoders.bean(classOf[CustomClass])
java.lang.NullPointerException
  at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
  at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:125)
  at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:89)
  at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
  ... 48 elided

Bean 编码器内部使用

JavaTypeInference.serializerFor(protoClass)

如果我尝试在我的自定义编码器中做同样的事情,我会收到一条更具描述性的错误消息:

Caused by: java.lang.UnsupportedOperationException: Cannot infer type for class xxx.yyy.CustomClass because it is not bean-compliant
        at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:430)
        at org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:337)
        at xxx.yyy..EncoderHolder$.protoEncoder(xxx.scala:69)
        at xxx.yyy..EncoderHolder$.encoder$lzycompute$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.encoder$1(xxx.scala:82)
        at xxx.yyy..EncoderHolder$.liftedTree1$1(xxx.scala:84)
        at xxx.yyy..EncoderHolder$.<init>(xxx.scala:81)
        at xxx.yyy..EncoderHolder$.<clinit>(xxx.scala)
4

5 回答 5

3

我在编码器方面的经验并不是很有希望,在这一点上,我建议不要在这方面花费更多时间。

我宁愿考虑替代方案以及如何以自己的方式使用 Spark,并在最后一步将 Spark 计算的结果映射到 protobuf 生成的类。

于 2017-06-27T01:30:43.920 回答
2

要将 Row 转换为 Protobuf 类,您可以使用sparksql-protobuf

该库提供了在 SparkSQL 中使用 Protobuf 对象的实用程序。它提供了一种读取 SparkSQL 写入的 parquet 文件作为兼容 protobuf 对象的 RDD 的方法。它还可以将 protobuf 对象的 RDD 转换为 DataFrame。

将依赖项添加到您的build.sbt文件

resolvers += Resolver.jcenterRepo

libraryDependencies ++= Seq(
    "com.github.saurfang" %% "sparksql-protobuf" % "0.1.2",
    "org.apache.parquet" % "parquet-protobuf" % "1.8.1"

)

您可以按照库中的一些示例开始

示例 1

示例 2

我希望这有帮助!

于 2017-06-26T22:23:22.620 回答
0

我这样做的方式:我使用了 saurfang 的 sparksql-protobuf 库(代码在 Github 上可用)。你直接得到一个 RDD[ProtoSchema],但是它很难转换成一个 Dataset[ProtoSchema]。我主要用它来获取信息以附加到另一个具有用户定义函数的 RDD。

1:导入库

使用 Maven:

<dependencies>
    <dependency>
        <groupId>com.github.saurfang</groupId>
        <artifactId>sparksql-protobuf_2.10</artifactId>
        <version>0.1.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-protobuf</artifactId>
        <version>1.9.0</version>
    </dependency>

    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>3.5.1</version>
    </dependency>
</dependencies>
...

<repositories>
    <repository>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
        <id>bintray-saurfang-maven</id>
        <name>bintray</name>
        <url>https://dl.bintray.com/saurfang/maven</url>
    </repository>
</repositories>

2:读取数据作为RDD[ProtoSchema]

val sess: SparkSession = ...
val proto_rdd = new ProtoParquetRDD[ProtoSchema](sess.sparkContext, input_path, classOf[ProtoSchema])

(可选)添加 PathFilter(Hadoop API)

如果您想添加一个 PathFilter 类(就像您以前使用 Hadoop 一样),或者激活您在 Hadoop 中拥有的其他选项,您可以执行以下操作:

sess.sparkContext.hadoopConfiguration.setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true)
sess.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[MyPathFiltering], classOf[PathFilter])

但是不要忘记清除您的 Hadoop 配置,以防您想使用 SparkSession 读取其他内容:

sess.sparkContext.hadoopConfiguration.clear()
于 2018-02-23T08:41:45.840 回答
0

虽然不是一个严格的答案,但我确实得到了解决方法。如果我们使用 RDD,则不需要编码器。

val rows =
      spark.sql("select * from tablename").as[CaseClass].rdd
val transformedRows = rows.map {
  case Row(f1: String, f2: Long ) => {
  val pbufClass = CustomClass.newBuilder()
                             .setF1(f1)
                             .setF2(f2)
  pbufClass.build()}}

这给了我一个可以使用的 Protobuf 类的 RDD。

于 2017-06-27T20:18:40.013 回答
0

默认序列化也不适用于我的 protobuf 对象。

然而,事实证明 spark 内部使用的是 kryo。所以如果你这样做

Encoders.kryo(ProtoBuffObject.class)

有效。

于 2020-12-04T08:34:57.640 回答