我正在使用 2.1.1 版编写 Spark 应用程序。以下代码在调用带有 LocalDate 参数的方法时出错?
线程“主”java.lang.UnsupportedOperationException 中的异常:找不到 java.time.LocalDate 的编码器 - 字段(类:“java.time.LocalDate”,名称:“_2”) - 根类:“scala.Tuple2” 在 org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602) 在 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596) 在 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587) 在 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 在 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 在 scala.collection.immutable.List.foreach(List.scala:381) 在 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 在 scala.collection.immutable.List.flatMap(List.scala:344) 在 org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:587) ……
val date : LocalDate = ....
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
import sqlContext.implicits._
val processed = itemListJob.run(rc, priority).select("id").map(d => {
runJob.run(d, date)
})
class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
def run(date: LocalDate) = {
import sqlContext.implicits._
sqlContext.read.format("jdbc").options(Map(
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url" -> jdbcSqlConn,
"dbtable" -> s"dbo.GetList('$date')"
)).load()
.select("id")
.as[Int]
}
}
更新:
我将返回类型更改runJob.run()
为元组(int, java.sql.Date)
并将 lambda 中的代码更改.map(...)
为
val processed = itemListJob.run(rc, priority).select("id").map(d => {
val (a,b) = runJob.run(d, date)
$"$a, $b"
})
现在错误变为
[错误] C:\....\scala\main.scala:40: 找不到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持。 [错误] val 处理 = itemListJob.run(rc, priority).map(d => { [错误] ^ [错误] 发现一个错误 [错误] (compile:compileIncremental) 编译失败