每当我重新创建(删除和创建上下文)Spark SQL 上下文并第二次运行作业时,我都会遇到奇怪的错误,或者在它总是抛出这个异常之后。
[2016-09-20 13:52:28,743] ERROR .jobserver.JobManagerActor [] [akka://JobServer/user/context-supervisor/ctx] - Exception from job 23fe1335-55ec-47b2-afd3-07396483eae0:
java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: org.lala.Country cannot be cast to org.lala.Country
staticinvoke(class org.apache.spark.unsafe.types.UTF8String,StringType,fromString,invoke(input[0, ObjectType(class org.lala.Country)],code,ObjectType(class java.lang.String)),true) AS code#10
+- staticinvoke(class org.apache.spark.unsafe.types.UTF8String,StringType,fromString,invoke(input[0, ObjectType(class org.lala.Country)],code,ObjectType(class java.lang.String)),true)
+- invoke(input[0, ObjectType(class org.lala.Country)],code,ObjectType(class java.lang.String))
+- input[0, ObjectType(class org.lala.Country)]
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:220)
at org.apache.spark.sql.SQLContext$$anonfun$8.apply(SQLContext.scala:504)
at org.apache.spark.sql.SQLContext$$anonfun$8.apply(SQLContext.scala:504)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:504)
at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:141)
at org.lala.HelloJob$.runJob(HelloJob.scala:18)
at org.lala.HelloJob$.runJob(HelloJob.scala:13)
at spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:301)
我的火花课:
case class Country(code:String)
object TestJob extends SparkSqlJob {
override def runJob(sc: SQLContext, jobConfig: Config): Any = {
import sc.implicits._
val country = List(Country("A"),Country("B"))
val countryDS = country.toDS()
countryDS.collect().foreach(println)
}
override def validate(sc: SQLContext, config: Config): SparkJobValidation = {
SparkJobValid
}
}
我在用着:
- 火花 1.6.1
- Spark 作业服务器 0.6.2 (docker)