2

我在尝试在我的 spark 作业中解析 json 时遇到了问题。我正在使用带有 DSE 4.6 的 spark 1.1.0、json4s 和 Cassandra Spark 连接器。抛出的异常是:

org.json4s.package$MappingException: Can't find constructor for BrowserData      org.json4s.reflect.ScalaSigReader$.readConstructor(ScalaSigReader.scala:27)
   org.json4s.reflect.Reflector$ClassDescriptorBuilder.ctorParamType(Reflector.scala:108)
        org.json4s.reflect.Reflector$ClassDescriptorBuilder$$anonfun$6.apply(Reflector.scala:98)
        org.json4s.reflect.Reflector$ClassDescriptorBuilder$$anonfun$6.apply(Reflector.scala:95)
        scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

我的代码如下所示:

case class BrowserData(navigatorObjectData: Option[NavigatorObjectData],
                       flash_version: Option[FlashVersion],
                       viewport: Option[Viewport],
                       performanceData: Option[PerformanceData])

.... other case classes

def parseJson(b: Option[String]): Option[String] = {
    implicit val formats = DefaultFormats
      for {
        browserDataStr <- b
        browserData = parse(browserDataStr).extract[BrowserData]
        navObject <- browserData.navigatorObjectData
        userAgent <- navObject.userAgent
      } yield (userAgent)
  }

def getJavascriptUa(rows: Iterable[com.datastax.spark.connector.CassandraRow]): Option[String] = {
  implicit val formats = DefaultFormats
  rows.collectFirst { case r if r.getStringOption("browser_data").isDefined  =>
    parseJson(r.getStringOption("browser_data"))
  }.flatten
}

def getRequestUa(rows: Iterable[com.datastax.spark.connector.CassandraRow]): Option[String] = {
  rows.collectFirst { case r if r.getStringOption("ua").isDefined  =>
    r.getStringOption("ua")
  }.flatten
}

def checkUa(rows: Iterable[com.datastax.spark.connector.CassandraRow], sessionId: String): Option[Boolean] = {
  for {
    jsUa <- getJavascriptUa(rows)
    reqUa <- getRequestUa(rows)
  } yield (jsUa == reqUa)
}

def run(name: String) = {
  val rdd = sc.cassandraTable("beehive", name).groupBy(r => r.getString("session_id"))
  val counts = rdd.map(r => (checkUa(r._2, r._1)))
  counts
}

我使用:load将文件加载到 REPL 中,然后调用该run函数。parseJson据我所知,故障发生在功能中。我已经尝试了很多方法来尝试让它发挥作用。从类似的帖子中,我确保我的案例类位于文件的顶层。我尝试将案例类定义编译到一个 jar 中,并像这样将 jar 包括在内:/usr/bin/dse spark --jars case_classes.jar

我试过像这样将它们添加到conf中:sc.getConf.setJars(Seq("/home/ubuntu/case_classes.jar"))

仍然是同样的错误。我应该把我所有的代码编译成一个jar吗?这是火花问题还是 JSON4s 问题?任何帮助都表示赞赏。

4

0 回答 0