1

我使用 Spark 1.6.0 和 Scala 2.10.5。

$ spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

import org.apache.spark.sql.SQLContext   
import sqlContext.implicits._    
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val bankSchema = StructType(Array(
  StructField("age", IntegerType, true),
  StructField("job", StringType, true),
  StructField("marital", StringType, true),
  StructField("education", StringType, true),
  StructField("default", StringType, true),
  StructField("balance", IntegerType, true),
  StructField("housing", StringType, true),
  StructField("loan", StringType, true),
  StructField("contact", StringType, true),
  StructField("day", IntegerType, true),
  StructField("month", StringType, true),
  StructField("duration", IntegerType, true),
  StructField("campaign", IntegerType, true),
  StructField("pdays", IntegerType, true),
  StructField("previous", IntegerType, true),
  StructField("poutcome", StringType, true),
  StructField("y", StringType, true)))

val market_details = sqlContext.
  read.
  format("com.databricks.spark.csv").
  option("header", "true").
  schema(bankSchema).
  load("/user/sachnil.2007_gmail/Project1_dataset_bank-full.csv")    
market_details.registerTempTable("phone_table")    
val temp = sqlContext.sql("SELECT * FROM phone_table").show()

我得到的错误是:

17/05/14 06:11:42 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.NumberFormatException: For input string: "58;"management";"married";"tertiary";"no";2143;"yes";"no";"unknown";5;"may";261;1;-1;0;"unknown";"no"" at 
    java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at 
    java.lang.Integer.parseInt(Integer.java:580) at 
    java.lang.Integer.parseInt(Integer.java:615) at 
    scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at 
    scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at 
    com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:61) at 
    com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:121) at 
    com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:108) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at 
    scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at 
    scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at 
    scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at 
    scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at 
    scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at 
    scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at 
    org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)

CSV 内容如下所示:

"age";"job";"marital";"education";"default";"balance";"housing";"loan";"contact";"day";"month";"duration";"campaign";"pdays";"previous";"poutcome";"y"
58;"management";"married";"tertiary";"no";2143;"yes";"no";"unknown";5;"may";261;1;-1;0;"unknown";"no"
44;"technician";"single";"secondary";"no";29;"yes";"no";"unknown";5;"may";151;1;-1;0;"unknown";"no"
33;"entrepreneur";"married";"secondary";"no";2;"yes";"yes";"unknown";5;"may";76;1;-1;0;"unknown";"no"
47;"blue-collar";"married";"unknown";"no";1506;"yes";"no";"unknown";5;"may";92;1;-1;0;"unknown";"no"

我该如何解决?

4

2 回答 2

1

这里似乎有两个问题:

  1. CSV 分隔符

您的 CSV 数据使用; 作为分隔符,您应该添加以下内容

.option("delimiter", ";")

为了使用 instruct spark 的 read 操作使用正确的分隔符

val market_details = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.schema(bankSchema)
.option("delimiter", ";")
.load("/user/sachnil.2007_gmail/Project1_dataset_bank-full.csv")    

有关csv 格式 spark-csv 的更多信息

分隔符:默认情况下,列使用 分隔,但分隔符可以设置为任何字符

  1. 输入数据包括引号 (")

您的输入数据包括不需要的
请从 csv 输入文件中删除”,然后再次运行它(PSB 示例输入):

age;job;marital;education;default;balance;housing;loan;contact;day;month;duration;campaign;pdays;previous;poutcome;y
58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no
44;technician;single;secondary;no;29;yes;no;unknown;5;may;151;1;-1;0;unknown;no
33;entrepreneur;married;secondary;no;2;yes;yes;unknown;5;may;76;1;-1;0;unknown;no
47;blue-collar;married;unknown;no;1506;yes;no;unknown;5;may;92;1;-1;0;unknown;no

在这里你可以找到 spark-sql-csv-examples

Baby Names示例使用以下CSV 输入(标题,后跟示例,不带引号):

Year,First Name,County,Sex,Count
2013,GAVIN,ST LAWRENCE,M,9
2013,LEVI,ST LAWRENCE,M,9
2013,LOGAN,NEW YORK,M,44
于 2017-05-14T08:25:42.003 回答
0

Spark 1.6.0 太老了,以至于现在几乎没有人支持它(除非它是商业支持的一部分)。我强烈建议升级到最新版本 2.1.1,它为您提供了很多选择。


让我从这个开始:在我的自定义 2.3.0-SNAPSHOT 构建中,加载你的 CSV 文件是可行的,所以我认为你可能在你使用的版本中遇到了一些不支持的 spark-csv 功能。

请注意,从 Spark 2+ 开始,spark-csv 模块已集成到 Spark 中(您应该升级 Spark 的众多原因之一)。


如果您坚持使用自定义模式(您可以让 Spark 在使用inferSchema选项时自行确定)至少使用 DSL 来减少击键:

import org.apache.spark.sql.types._

val bankSchema = StructType(
  $"age".int ::
  $"job".string ::
  $"marital".string ::
  $"education".string ::
  $"default".string ::
  $"balance".int ::
  $"housing".string ::
  $"loan".string ::
  $"contact".string ::
  $"day".int ::
  $"month".string ::
  $"duration".int ::
  $"campaign".int ::
  $"pdays".int ::
  $"previous".int ::
  $"poutcome".string ::
  $"y".string ::
  Nil)

scala> println(bankSchema.treeString)
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)

如果您使用 Scala 开发 Spark 应用程序,我强烈建议您使用案例类和利用编码器来描述模式(同样是 Spark 2+)。

case class Market(
  age: Int,
  job: String,
  marital: String,
  education: String,
  default: String,
  balance: Int,
  housing: String,
  loan: String,
  contact: String,
  day: Int,
  month: String,
  duration: Int,
  campaign: Int,
  pdays: Int,
  previous: Int,
  poutcome: String,
  y: String)
import org.apache.spark.sql.Encoders
scala> val bankSchema = Encoders.product[Market]
java.lang.UnsupportedOperationException: `default` is a reserved keyword and cannot be used as field name
- root class: "Market"
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:611)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:609)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:344)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:609)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:440)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
  at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
  ... 48 elided

default(在这种特殊情况下,由于您可能希望在手动构建的模式中避免使用保留关键字,因此这是不可能的)。


阅读模式后,您在问题中包含的示例不会出现错误:

val marketDetails = spark.
  read.
  schema(bankSchema).
  option("header", true).
  option("delimiter", ";").
  csv("market_details.csv")

scala> marketDetails.show
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+

我真正喜欢 Spark SQL 的地方在于,如果这是您在 Spark 中首选的“语言”,您可以坚持使用纯 SQL。

val q = """
  CREATE OR REPLACE TEMPORARY VIEW phone_table
  USING csv
  OPTIONS (
    inferSchema true,
    header true,
    delimiter ';',
    path 'market_details.csv')"""

// execute the above query and discard the result
// we're only interested in the side effect of creating a temp view
sql(q).collect

scala> sql("select * from phone_table").show
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+

提示:使用spark-sql,您可以完全将 Scala 放在一边。

于 2017-05-14T11:39:53.920 回答