我正在尝试使用 Apache Zeppelin 笔记本将 CSV 文件加载到带有 spark-csv [1] 的 Spark 数据帧中,当加载没有值的数字字段时,解析器对该行失败并且该行被跳过。
我本来希望该行被加载并且数据框中的值加载该行并将值设置为NULL,以便聚合忽略该值。
%dep
z.reset()
z.addRepo("my-nexus").url("<my_local_nexus_repo_that_is_a_proxy_of_public_repos>")
z.load("com.databricks:spark-csv_2.10:1.1.0")
%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._
val schema = StructType(
StructField("identifier", StringType, true) ::
StructField("name", StringType, true) ::
StructField("height", DoubleType, true) ::
Nil)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.load("file:///home/spark_user/data.csv")
df.describe("height").show()
这是数据文件的内容:/home/spark_user/data.csv
identifier,name,height
1,sam,184
2,cath,180
3,santa, <-- note that there is not height recorded for Santa !
这是输出:
+-------+------+
|summary|height|
+-------+------+
| count| 2| <- 2 of 3 lines loaded, ie. sam and cath
| mean| 182.0|
| stddev| 2.0|
| min| 180.0|
| max| 184.0|
+-------+------+
在 zeppelin 的日志中,我可以在解析 santa 的行时看到以下错误:
ERROR [2015-07-21 16:42:09,940] ({Executor task launch worker-45} CsvRelation.scala[apply]:209) - Exception while parsing line: 3,santa,.
java.lang.NumberFormatException: empty String
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
at java.lang.Double.parseDouble(Double.java:538)
at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232)
at scala.collection.immutable.StringOps.toDouble(StringOps.scala:31)
at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:42)
at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:198)
at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:180)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:129)
at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
所以到目前为止你可能会告诉我这么好......你是对的;)
现在我想添加一个额外的列,比如年龄,我总是在那个字段中有数据。
identifier,name,height,age
1,sam,184,30
2,cath,180,32
3,santa,,70
现在礼貌地询问一些关于年龄的统计数据:
%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._
val schema = StructType(
StructField("identifier", StringType, true) ::
StructField("name", StringType, true) ::
StructField("height", DoubleType, true) ::
StructField("age", DoubleType, true) ::
Nil)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.load("file:///home/spark_user/data2.csv")
df.describe("age").show()
结果
+-------+----+
|summary| age|
+-------+----+
| count| 2|
| mean|31.0|
| stddev| 1.0|
| min|30.0|
| max|32.0|
+-------+----+
都错了!由于圣诞老人的身高未知,整条线都丢失了,年龄的计算仅基于 Sam 和 Cath,而圣诞老人的年龄完全有效。
我的问题是我需要插入圣诞老人的高度以便加载 CSV 的值是多少。我试图将架构设置为全部 StringType 但随后
下一个问题更多关于
我在 API 中发现可以使用 spark 处理 N/A 值。所以我想也许我可以加载所有列设置为 StringType 的数据,然后进行一些清理,然后只正确设置架构,如下所示:
%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._
val schema = StructType(
StructField("identifier", StringType, true) ::
StructField("name", StringType, true) ::
StructField("height", StringType, true) ::
StructField("age", StringType, true) ::
Nil)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").schema(schema).option("header", "true").load("file:///home/spark_user/data.csv")
// eg. for each column of my dataframe, replace empty string by null
df.na.replace( "*", Map("" -> null) )
val toDouble = udf[Double, String]( _.toDouble)
df2 = df.withColumn("age", toDouble(df("age")))
df2.describe("age").show()
但是 df.na.replace() 抛出异常并停止:
java.lang.IllegalArgumentException: Unsupported value type java.lang.String ().
at org.apache.spark.sql.DataFrameNaFunctions.org$apache$spark$sql$DataFrameNaFunctions$$convertToDouble(DataFrameNaFunctions.scala:417)
at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337)
at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.DataFrameNaFunctions.replace0(DataFrameNaFunctions.scala:337)
at org.apache.spark.sql.DataFrameNaFunctions.replace(DataFrameNaFunctions.scala:304)
非常感谢任何帮助和提示!