9

基本上,我必须使用 Spark 分析 HDFS 上的一些复杂 JSON。

我使用“用于理解”来(预)过滤 JSON 和 json4s 的“提取”方法以将其包装到案例类中

这个很好用!

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized
}

到目前为止,一切都很好!

当我尝试将(预)过滤的 JSON 提取到我的 CaseClass 时,我得到了这个:

线程“主”org.apache.spark.SparkException 中的异常:作业因阶段失败而中止:任务不可序列化:java.io.NotSerializableException:org.json4s.DefaultFormats$

这里是提取代码:

def foo(rdd: RDD[String]) = {

case class View(C: String,b: Option[Array[List[String]]],  t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats

rdd.map { jsonString =>
  val jsonObj = parse(jsonString)
  val listsOfView = for {
    JObject(value) <- jsonObj
    JField(("v"), JObject(views)) <- value
    normalized <- views.map(x => (x._2))
  } yield normalized.extract[View]
}

我已经在 scala ws 上尝试了我的代码,并且它的工作!我对 hdfs 和 spark 的东西真的很陌生,所以我会很感激一个提示。

4

3 回答 3

7

Spark 序列化 RDD 转换上的闭包并将其“运送”给工作人员以进行分布式执行。这要求闭包中的所有代码(通常也在包含对象中)应该是可序列化的。

看看org.json4s.DefaultFormat$的 impl (该特征的伴随对象):

object DefaultFormats extends DefaultFormats {
    val losslessDate = new ThreadLocal(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
    val UTC = TimeZone.getTimeZone("UTC")

}

很明显,这个对象是不可序列化的,也不能这样。(ThreadLocal 本质上是不可序列化的)

您似乎没有Date在代码中使用类型,那么您可以用可序列化的东西摆脱 implicit val formats = DefaultFormats或替换 DefaultFormats 吗?

于 2014-07-17T16:16:13.760 回答
3

这实际上已经得到修复;JSON4S 从 3.3.0 版开始可序列化: https ://github.com/json4s/json4s/issues/137

于 2015-10-02T00:22:46.833 回答
1

解决我的问题的是,我implicit val formats = DefaultFormatsrdd.foreach{}循环中使用。它解决了我的可序列化异常。

这是我解决问题的代码片段:

case class rfId(rfId: String) {}

// ... some code here ...

 rdd.foreach { record =>
    val value = record.value()

    // Bring in default date formats etc and makes json4s serializable
    implicit val formats = DefaultFormats
    val json = parse(value)
    println(json.camelizeKeys.extract[rfId])  // Prints `rfId(ABC12345678)`
 }
于 2018-09-19T08:35:03.940 回答