4

我正在使用 2.1.1 版编写 Spark 应用程序。以下代码在调用带有 LocalDate 参数的方法时出错?

线程“主”java.lang.UnsupportedOperationException 中的异常:找不到 java.time.LocalDate 的编码器
- 字段(类:“java.time.LocalDate”,名称:“_2”)
- 根类:“scala.Tuple2”
        在 org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
        在 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
        在 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
        在 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        在 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        在 scala.collection.immutable.List.foreach(List.scala:381)
        在 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        在 scala.collection.immutable.List.flatMap(List.scala:344)
        在 org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:587)
……
val date : LocalDate = ....
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
import sqlContext.implicits._ 
val processed = itemListJob.run(rc, priority).select("id").map(d => {
  runJob.run(d, date) 
})

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    import sqlContext.implicits._ 
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
    .select("id") 
    .as[Int] 
  }
}

更新: 我将返回类型更改runJob.run()为元组(int, java.sql.Date)并将 lambda 中的代码更改.map(...)

val processed = itemListJob.run(rc, priority).select("id").map(d => {
  val (a,b) = runJob.run(d, date) 
  $"$a, $b"
})

现在错误变为

[错误] C:\....\scala\main.scala:40: 找不到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持。
[错误] val 处理 = itemListJob.run(rc, priority).map(d => {
[错误] ^
[错误] 发现一个错误
[错误] (compile:compileIncremental) 编译失败
4

1 回答 1

1

对于自定义数据集类型,您可以使用 Kyro serde 框架,只要您的数据实际上是可序列化的(也就是实现可序列化)。这是使用 Kyro 的一个示例:Spark No Encoder found for java.io.Serializable in Map[String, java.io.Serializable]

始终推荐使用 Kyro,因为它速度更快并且与 Java serde 框架兼容。您绝对可以选择 Java 原生 serde(ObjectWriter/ObjectReader),但速度要慢得多。

就像上面的评论一样,SparkSQL 在 下提供了许多有用的编码器sqlContext.implicits._,但这并不能涵盖所有内容,因此您可能必须插入自己的编码器。

就像我说的,你的自定义数据必须是可序列化的,并且根据https://docs.oracle.com/javase/8/docs/api/java/time/LocalDate.html,它实现了 Serializable 接口,所以你肯定是这里好。

于 2018-11-18T07:31:35.070 回答