11

我在尝试在我的 spark 作业中解析 json 时遇到了问题。我正在使用spark 1.1.0,json4sCassandra Spark Connector. 抛出的异常是:

java.io.NotSerializableException: org.json4s.DefaultFormats

检查 DefaultFormats 伴随对象,并通过这个堆栈问题,很明显 DefaultFormats 不能被序列化。现在的问题是该怎么做。

我可以看到这张显然通过添加关键字瞬态在 spark 代码库中解决了这个问题,但我不确定如何或在何处将其应用于我的案例。解决方案是否只在执行程序上实例化 DefaultFormats 类,以避免一起序列化?人们正在使用另一个用于 scala/spark 的 JSON 解析库吗?我最初尝试单独使用 jackson,但遇到了一些我无法轻松解决的注释错误,并且 json4s 开箱即用。这是我的代码:

import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats

val count = rdd.map(r => checkUa(r._2, r._1)).reduce((x, y) => x + y) 

我在 checkUa 函数中进行 json 解析。我试着让 count 变得懒惰,希望它能以某种方式延迟执行,但它没有效果。也许在 checkUA 中移动隐式 val?非常感谢任何建议。

4

2 回答 2

19

这已经在json4s 的公开票中得到了回答。解决方法是将implicit声明放在函数内部

val count = rdd
               .map(r => {implicit val formats = DefaultFormats; checkUa(r._2, r._1)})
               .reduce((x, y) => x + y) 
于 2015-04-15T23:05:01.757 回答
3

当我将implicit val formats = ...声明放在包含解析的方法中时,我遇到了同样的错误,而不是在类(对象)上声明它。

所以这会抛出一个错误:

object Application {

  //... Lots of other code here, which eventually calls 
  // setupStream(...)

  def setupStream(streamingContext: StreamingContext,
                          brokers: String,
                          topologyTopicName: String) = {
    implicit val formats = DefaultFormats
    _createDStream(streamingContext, brokers, topologyTopicName)
      // Remove the message key, which is always null in our case
      .map(_._2)
      .map((json: String) => parse(json).camelizeKeys
        .extract[Record[TopologyMetadata, Unused]])
      .print()
}

但这很好:

object Application {

  implicit val formats = DefaultFormats

  //... Lots of other code here, which eventually calls 
  // setupStream(...)

  def setupStream(streamingContext: StreamingContext,
                          brokers: String,
                          topologyTopicName: String) = {
    _createDStream(streamingContext, brokers, topologyTopicName)
      // Remove the message key, which is always null in our case
      .map(_._2)
      .map((json: String) => parse(json).camelizeKeys
        .extract[Record[TopologyMetadata, Unused]])
      .print()
}
于 2016-04-25T12:45:57.113 回答