基本上,我必须使用 Spark 分析 HDFS 上的一些复杂 JSON。
我使用“用于理解”来(预)过滤 JSON 和 json4s 的“提取”方法以将其包装到案例类中
这个很好用!
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized
}
到目前为止,一切都很好!
当我尝试将(预)过滤的 JSON 提取到我的 CaseClass 时,我得到了这个:
线程“主”org.apache.spark.SparkException 中的异常:作业因阶段失败而中止:任务不可序列化:java.io.NotSerializableException:org.json4s.DefaultFormats$
这里是提取代码:
def foo(rdd: RDD[String]) = {
case class View(C: String,b: Option[Array[List[String]]], t: Time)
case class Time($numberLong: String)
implicit val formats = DefaultFormats
rdd.map { jsonString =>
val jsonObj = parse(jsonString)
val listsOfView = for {
JObject(value) <- jsonObj
JField(("v"), JObject(views)) <- value
normalized <- views.map(x => (x._2))
} yield normalized.extract[View]
}
我已经在 scala ws 上尝试了我的代码,并且它的工作!我对 hdfs 和 spark 的东西真的很陌生,所以我会很感激一个提示。