4

每当我重新创建(删除和创建上下文)Spark SQL 上下文并第二次运行作业时,我都会遇到奇怪的错误,或者在它总是抛出这个异常之后。

[2016-09-20 13:52:28,743] ERROR .jobserver.JobManagerActor [] [akka://JobServer/user/context-supervisor/ctx] - Exception from job 23fe1335-55ec-47b2-afd3-07396483eae0:
java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: org.lala.Country cannot be cast to org.lala.Country
staticinvoke(class org.apache.spark.unsafe.types.UTF8String,StringType,fromString,invoke(input[0, ObjectType(class org.lala.Country)],code,ObjectType(class java.lang.String)),true) AS code#10
+- staticinvoke(class org.apache.spark.unsafe.types.UTF8String,StringType,fromString,invoke(input[0, ObjectType(class org.lala.Country)],code,ObjectType(class java.lang.String)),true)
   +- invoke(input[0, ObjectType(class org.lala.Country)],code,ObjectType(class java.lang.String))
      +- input[0, ObjectType(class org.lala.Country)]

        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:220)
        at org.apache.spark.sql.SQLContext$$anonfun$8.apply(SQLContext.scala:504)
        at org.apache.spark.sql.SQLContext$$anonfun$8.apply(SQLContext.scala:504)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:504)
        at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:141)
        at org.lala.HelloJob$.runJob(HelloJob.scala:18)
        at org.lala.HelloJob$.runJob(HelloJob.scala:13)
        at spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:301)

我的火花课:

case class Country(code:String)

object TestJob extends SparkSqlJob {
  override def runJob(sc: SQLContext, jobConfig: Config): Any = {
    import sc.implicits._

    val country = List(Country("A"),Country("B"))
    val countryDS = country.toDS()
    countryDS.collect().foreach(println)
  }

  override def validate(sc: SQLContext, config: Config): SparkJobValidation = {
    SparkJobValid
  }
}

我在用着:

  • 火花 1.6.1
  • Spark 作业服务器 0.6.2 (docker)
4

0 回答 0