3

更新:请坚持这个问题。我发现这可能是 Spark 1.5 本身的问题,因为我没有使用官方版本的 Spark。我会不断更新这个问题。谢谢!

最近在使用 Spark-CSV 将 CSV 导入 Spark 中的 DataFrame 时,我注意到一个奇怪的错误。

这是我的示例代码:

  object sparktry
  {
    def main(args: Array[String])
    {
      AutoLogger.setLevel("INFO")

      val sc = SingletonSparkContext.getInstance()
      val sql_context = SingletonSQLContext.getInstance(sc)

      val options = new collection.mutable.HashMap[String, String]()
      options += "header" -> "true"
      options += "charset" -> "UTF-8"

      val customSchema = StructType(Array(
        StructField("Year", StringType),
        StructField("Brand", StringType),
        StructField("Category", StringType),
        StructField("Model", StringType),
        StructField("Sales", DoubleType)))

      val dataFrame = sql_context.read.format("com.databricks.spark.csv")
      .options(options)
      .schema(customSchema)
      .load("hdfs://myHDFSserver:9000/BigData/CarSales.csv")

      dataFrame.head(10).foreach(x => AutoLogger.info(x.toString))
    }
  }

CarSales 是一个非常小的 csv。我注意到当spark.masteris not时local,设置spark.executor.memory为 16GB 以上会导致 DataFrame 损坏。该程序的输出如下所示:(我从日志中复制了文本,在本例spark.executor.memory中设置为 32GB)

16/03/07 12:39:50.190 INFO DAGScheduler: Job 1 finished: head at sparktry.scala:35, took 8.009183 s
16/03/07 12:39:50.225 INFO AutoLogger$: [       ,  ,      ,ries       ,142490.0]
16/03/07 12:39:50.225 INFO AutoLogger$: [       ,  ,      ,ries       ,112464.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,      ,ries       ,90960.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,      ,ries       ,100910.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,      ,ries       ,94371.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,      ,ries       ,54142.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,       ,ries       ,14773.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,       ,ries       ,12276.0]
16/03/07 12:39:50.227 INFO AutoLogger$: [       ,  ,       ,ries       ,9254.0]
16/03/07 12:39:50.227 INFO AutoLogger$: [       ,  ,       ,ries       ,12253.0]

虽然文件的前 10 行是:

1/1/2007,BMW,Compact,BMW 3-Series,142490.00
1/1/2008,BMW,Compact,BMW 3-Series,112464.00
1/1/2009,BMW,Compact,BMW 3-Series,90960.00
1/1/2010,BMW,Compact,BMW 3-Series,100910.00
1/1/2011,BMW,Compact,BMW 3-Series,94371.00
1/1/2007,BMW,Compact,BMW 5-Series,54142.00
1/1/2007,BMW,Fullsize,BMW 7-Series,14773.00
1/1/2008,BMW,Fullsize,BMW 7-Series,12276.00
1/1/2009,BMW,Fullsize,BMW 7-Series,9254.00
1/1/2010,BMW,Fullsize,BMW 7-Series,12253.00

我注意到spark.executor.memory在我的机器上只更改为 16GB,前 10 行是正确的,但将其设置为超过 16GB 会导致损坏。

更重要的是:在我的一台具有 256GB 内存的服务器上,将其设置为 16GB 也会产生此错误。相反,将其设置为 48GB 将使其正常工作。另外,我尝试打印dataFrame.rdd,它表明RDD的内容是正确的,而数据框本身是不正确的。

有人对这个问题有任何想法吗?

谢谢!

4

2 回答 2

1

事实证明这是在 Spark 1.5.1 和 1.5.2 中使用 Kyro 进行序列化的一个错误。

https://github.com/databricks/spark-csv/issues/285#issuecomment-193633716

这在 1.6.0 中已修复。它与 spark-csv 无关。

于 2016-03-08T07:52:18.707 回答
0

我运行了您的代码,并能够使用 Spark 的默认配置从 hdfs 获取 csv 数据。

我为以下几行更新了您的代码:

val conf = new org.apache.spark.SparkConf().setMaster("local[2]").setAppName("HDFSReadDemo");
val sc = new org.apache.spark.SparkContext(conf); 
val sql_context = new org.apache.spark.sql.SQLContext(sc) 

并 println() 代替记录器。

dataFrame.head(10).foreach(x => println(x))

所以Spark内存配置应该没有问题(即spark.executor.memory)

于 2016-03-07T15:11:29.580 回答