0

我正在使用结构化流(Spark 2.0.2)来使用 kafka 消息。使用 scalapb,protobuf 中的消息。我收到以下错误。请帮忙..

线程“主”中的异常 scala.ScalaReflectionException:不是 scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199) 处的术语 scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols. scala:84) 在 org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(ScalaReflection.scala:811) 在 org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(ScalaReflection.scala:39) 在org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:800) at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:39) at org.apache.spark .sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:582) 在 org.apache.spark.sql.catalyst。ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:460) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583) 在 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) 在 scala.collection。 TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)在 scala.collection.immutable.List.flatMap(List.scala:344) 在 org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:第583章)spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425) 在 org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61) 在 org.apache.spark.sql。 org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47) 的编码器$.product(Encoders.scala:27​​4) 在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala ) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect。 Method.invoke(Method.java:498) 在 com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)serializerFor(ScalaReflection.scala:425) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61) at org.apache.spark.sql.Encoders$.product(Encoders.scala: 274) 在 org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47) 在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0 (本机方法)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498 ) 在 com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)serializerFor(ScalaReflection.scala:425) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61) at org.apache.spark.sql.Encoders$.product(Encoders.scala: 274) 在 org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47) 在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0 (本机方法)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498 ) 在 com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)ExpressionEncoder$.apply(ExpressionEncoder.scala:61) at org.apache.spark.sql.Encoders$.product(Encoders.scala:27​​4) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47)在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun .reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147 )ExpressionEncoder$.apply(ExpressionEncoder.scala:61) at org.apache.spark.sql.Encoders$.product(Encoders.scala:27​​4) at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47)在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun .reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147 )newProductEncoder(SQLImplicits.scala:47) 在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke( NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 com.intellij.rt.execution.application.AppMain .main(AppMain.java:147)newProductEncoder(SQLImplicits.scala:47) 在 PersonConsumer$.main(PersonConsumer.scala:33) 在 PersonConsumer.main(PersonConsumer.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke( NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 com.intellij.rt.execution.application.AppMain .main(AppMain.java:147)com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 的 java.lang.reflect.Method.invoke(Method.java:498) 的调用(DelegatingMethodAccessorImpl.java:43)com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 的 java.lang.reflect.Method.invoke(Method.java:498) 的调用(DelegatingMethodAccessorImpl.java:43)

以下是我的代码...

object PersonConsumer {
  import org.apache.spark.rdd.RDD
  import com.trueaccord.scalapb.spark._
  import org.apache.spark.sql.{SQLContext, SparkSession}
  import com.example.protos.demo._

  def main(args : Array[String]) {

    def parseLine(s: String): Person =
      Person.parseFrom(
        org.apache.commons.codec.binary.Base64.decodeBase64(s))

    val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

    import spark.implicits._

    val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()

    val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]

    val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")

    val ds4 = spark.sqlContext.sql("select name from persons")

    val query = ds4.writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
  }
}
4

2 回答 2

0

在 Person 类中,gender 是一个枚举,这就是导致此问题的原因。删除此字段后,它工作正常。以下是我从 DataBricks 的 Shixiong(Ryan) 那里得到的答案。

问题是“可选的 Gender 性别 = 3;”。生成的“性别”类是一个 trait,Spark 不知道如何创建一个 trait,因此不支持。您可以定义 SQL 编码器支持的类,并将生成的类转换为parseLine.

于 2016-11-17T17:23:48.787 回答
0

val ds3应该是:

val ds3 = ds2.map(str => parseLine(str))

sqlContext.protoToDataFrame(ds3).registerTempTable("persons")

RDD在保存为临时表之前需要转换为数据框。

于 2016-11-17T15:51:01.810 回答