我在尝试在我的 spark 作业中解析 json 时遇到了问题。我正在使用带有 DSE 4.6 的 spark 1.1.0、json4s 和 Cassandra Spark 连接器。抛出的异常是:
org.json4s.package$MappingException: Can't find constructor for BrowserData org.json4s.reflect.ScalaSigReader$.readConstructor(ScalaSigReader.scala:27)
org.json4s.reflect.Reflector$ClassDescriptorBuilder.ctorParamType(Reflector.scala:108)
org.json4s.reflect.Reflector$ClassDescriptorBuilder$$anonfun$6.apply(Reflector.scala:98)
org.json4s.reflect.Reflector$ClassDescriptorBuilder$$anonfun$6.apply(Reflector.scala:95)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
我的代码如下所示:
case class BrowserData(navigatorObjectData: Option[NavigatorObjectData],
flash_version: Option[FlashVersion],
viewport: Option[Viewport],
performanceData: Option[PerformanceData])
.... other case classes
def parseJson(b: Option[String]): Option[String] = {
implicit val formats = DefaultFormats
for {
browserDataStr <- b
browserData = parse(browserDataStr).extract[BrowserData]
navObject <- browserData.navigatorObjectData
userAgent <- navObject.userAgent
} yield (userAgent)
}
def getJavascriptUa(rows: Iterable[com.datastax.spark.connector.CassandraRow]): Option[String] = {
implicit val formats = DefaultFormats
rows.collectFirst { case r if r.getStringOption("browser_data").isDefined =>
parseJson(r.getStringOption("browser_data"))
}.flatten
}
def getRequestUa(rows: Iterable[com.datastax.spark.connector.CassandraRow]): Option[String] = {
rows.collectFirst { case r if r.getStringOption("ua").isDefined =>
r.getStringOption("ua")
}.flatten
}
def checkUa(rows: Iterable[com.datastax.spark.connector.CassandraRow], sessionId: String): Option[Boolean] = {
for {
jsUa <- getJavascriptUa(rows)
reqUa <- getRequestUa(rows)
} yield (jsUa == reqUa)
}
def run(name: String) = {
val rdd = sc.cassandraTable("beehive", name).groupBy(r => r.getString("session_id"))
val counts = rdd.map(r => (checkUa(r._2, r._1)))
counts
}
我使用:load
将文件加载到 REPL 中,然后调用该run
函数。parseJson
据我所知,故障发生在功能中。我已经尝试了很多方法来尝试让它发挥作用。从类似的帖子中,我确保我的案例类位于文件的顶层。我尝试将案例类定义编译到一个 jar 中,并像这样将 jar 包括在内:/usr/bin/dse spark --jars case_classes.jar
我试过像这样将它们添加到conf中:sc.getConf.setJars(Seq("/home/ubuntu/case_classes.jar"))
仍然是同样的错误。我应该把我所有的代码编译成一个jar吗?这是火花问题还是 JSON4s 问题?任何帮助都表示赞赏。