6

我正在尝试使用 Apache Zeppelin 笔记本将 CSV 文件加载到带有 spark-csv [1] 的 Spark 数据帧中,当加载没有值的数字字段时,解析器对该行失败并且该行被跳过。

我本来希望该行被加载并且数据框中的值加载该行并将值设置为NULL,以便聚合忽略该值。

%dep
z.reset()
z.addRepo("my-nexus").url("<my_local_nexus_repo_that_is_a_proxy_of_public_repos>")
z.load("com.databricks:spark-csv_2.10:1.1.0")


%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._

val schema = StructType(
    StructField("identifier", StringType, true) ::
    StructField("name", StringType, true) ::
    StructField("height", DoubleType, true) :: 
    Nil)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv")
                        .schema(schema)
                        .option("header", "true")
                        .load("file:///home/spark_user/data.csv")

df.describe("height").show()

这是数据文件的内容:/home/spark_user/data.csv

identifier,name,height
1,sam,184
2,cath,180
3,santa,     <-- note that there is not height recorded for Santa !

这是输出:

+-------+------+
|summary|height|
+-------+------+
|  count|     2|    <- 2 of 3 lines loaded, ie. sam and cath
|   mean| 182.0|
| stddev|   2.0|
|    min| 180.0|
|    max| 184.0|
+-------+------+

在 zeppelin 的日志中,我可以在解析 santa 的行时看到以下错误:

ERROR [2015-07-21 16:42:09,940] ({Executor task launch worker-45} CsvRelation.scala[apply]:209) - Exception while parsing line: 3,santa,.
        java.lang.NumberFormatException: empty String
        at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)
        at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
        at java.lang.Double.parseDouble(Double.java:538)
        at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232)
        at scala.collection.immutable.StringOps.toDouble(StringOps.scala:31)
        at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:42)
        at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:198)
        at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:180)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:129)
        at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

所以到目前为止你可能会告诉我这么好......你是对的;)

现在我想添加一个额外的列,比如年龄,我总是在那个字段中有数据。

identifier,name,height,age
1,sam,184,30
2,cath,180,32
3,santa,,70

现在礼貌地询问一些关于年龄的统计数据:

%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._

val schema = StructType(
    StructField("identifier", StringType, true) ::
    StructField("name", StringType, true) ::
    StructField("height", DoubleType, true) :: 
    StructField("age", DoubleType, true) :: 
    Nil)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv")
                        .schema(schema)
                        .option("header", "true")
                        .load("file:///home/spark_user/data2.csv")

df.describe("age").show()

结果

+-------+----+
|summary| age|
+-------+----+
|  count|   2|
|   mean|31.0|
| stddev| 1.0|
|    min|30.0|
|    max|32.0|
+-------+----+

都错了!由于圣诞老人的身高未知,整条线都丢失了,年龄的计算仅基于 Sam 和 Cath,而圣诞老人的年龄完全有效。

我的问题是我需要插入圣诞老人的高度以便加载 CSV 的值是多少。我试图将架构设置为全部 StringType 但随后

下一个问题更多关于

我在 API 中发现可以使用 spark 处理 N/A 值。所以我想也许我可以加载所有列设置为 StringType 的数据,然后进行一些清理,然后只正确设置架构,如下所示:

%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._

val schema = StructType(
StructField("identifier", StringType, true) ::
StructField("name", StringType, true) ::
StructField("height", StringType, true) ::
StructField("age", StringType, true) ::
Nil)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").schema(schema).option("header", "true").load("file:///home/spark_user/data.csv")

// eg. for each column of my dataframe, replace empty string by null
df.na.replace( "*", Map("" -> null) )

val toDouble = udf[Double, String]( _.toDouble)
df2 = df.withColumn("age", toDouble(df("age")))

df2.describe("age").show()

但是 df.na.replace() 抛出异常并停止:

java.lang.IllegalArgumentException: Unsupported value type java.lang.String ().
        at org.apache.spark.sql.DataFrameNaFunctions.org$apache$spark$sql$DataFrameNaFunctions$$convertToDouble(DataFrameNaFunctions.scala:417)
        at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337)
        at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.sql.DataFrameNaFunctions.replace0(DataFrameNaFunctions.scala:337)
        at org.apache.spark.sql.DataFrameNaFunctions.replace(DataFrameNaFunctions.scala:304)

非常感谢任何帮助和提示!

[1] https://github.com/databricks/spark-csv

4

1 回答 1

5

Spark-csv 没有这个选项。它在 master 分支中修复。我想您应该使用它或等待下一个稳定版本。

于 2015-07-21T22:57:05.227 回答