1

我正在尝试创建一个使用 ProtoBuf 编码的 Kafka 消息的 Spark Streaming。

这是我最近几天尝试的

    import spark.implicits._
    def parseLine (str: Array[Byte]): ProtoSchema = ProtoSchema.parseFrom(str)   
    val storageLoc: String = "/tmp/avl/output"
    val checkpointLoc: String = "/tmp/avl/checkpoint"
    val dfStreamReader: DataFrame = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("failOnDataLoss", value = false)
      .option("subscribe", topics)
      .load()

    val dfStreamReaderValues: Dataset[Array[Byte]] = dfStreamReader.map(row => row.getAs[Array[Byte]]("value"))

    val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))

    val dfRaw: DataFrame = spark.sqlContext.protoToDataFrame(rddProtoSchema.rdd)

    val streamWriterAirline: StreamingQuery = dfRaw.writeStream
      .format("parquet")
      .option("path", storageLoc)
      .option("checkpointLocation", checkpointLoc)
      .outputMode(Append)
      .trigger(ProcessingTime("2 seconds"))
      .start()
    spark.streams.awaitAnyTermination(20000)

使用 scalapb,我设法解码二进制 proto 文件并转换为数据帧。但是对于流式传输,我在解析行的编译时得到了这个异常:

    val rddProtoSchema: Dataset[ProtoSchema] = dfStreamReaderValues.map(str => parseLine(str))
    >>>>>
    scala.ScalaReflectionException: <none> is not a term

谁能给点提示?

4

1 回答 1

1

更新:sparksql-scalapb 现在能够为协议缓冲区派生编码器,并且不再需要以前使用 UDT 生成器的方法。说明可在此处获得


旧答案(现在不相关):使用数据集时,Spark 会尝试为消息中的每个字段查找 SQL 类型。Spark 不知道如何处理 ScalaPB 枚举(它们被表示为由 case 对象扩展的密封特征),因此它给出了这个错误。解决方法是注册枚举和用户定义的类型。这可以按如下方式完成:

  1. 将 sparksql-scalapb-gen 添加到您的project/plugins.sbt (而不是您的 mainbuild.sbt):
libraryDependencies += "com.thesamet.scalapb" %% "sparksql-scalapb-gen" % "0.8.1"

检查上面的版本是否与sparksql-scalapb您正在使用的版本匹配。

  1. 将此生成器添加到您的PB.targetsin build.sbt
PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value,
  scalapb.UdtGenerator -> (sourceManaged in Compile).value
)
  1. 再生源(可能需要sbt clean后跟sbt compile

  2. 在您的主函数中调用生成的注册函数。这将是mypackage.MyFileUdt.register()

见:https ://scalapb.github.io/sparksql.html#datasets-and-none-is-not-a-term

示例项目:https ://github.com/thesamet/sparksql-scalapb-test

于 2019-10-06T19:36:24.027 回答