9

所以我试图加载 csv 文件来推断自定义模式,但每次我最终都会出现以下错误:

Project_Bank.csv 不是 Parquet 文件。尾部的预期幻数 [80, 65, 82, 49] 但发现 [110, 111, 13, 10]

这就是我的程序和我的 csv 文件条目的样子,

年龄;工作;婚姻;教育;默认;余额;住房;贷款;联系人;天;月;持续时间;竞选活动;pdays;以前;poutcome;y 58;管理;已婚;高等教育;否;2143;是;否;未知;5;可能;261;1;-1;0;未知;无 44;技术员;单身;中学;无;29;是;无;未知;5;可能;151;1;-1;0;未知;不 33;企业家;已婚;中学;否;2;是;是;未知;5;可能;76;1;-1;0;未知;否

我的代码:

$spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext   
import sqlContext.implicits._    
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val bankSchema = StructType(Array(
  StructField("age", IntegerType, true),
  StructField("job", StringType, true),
  StructField("marital", StringType, true),
  StructField("education", StringType, true),
  StructField("default", StringType, true),
  StructField("balance", IntegerType, true),
  StructField("housing", StringType, true),
  StructField("loan", StringType, true),
  StructField("contact", StringType, true),
  StructField("day", IntegerType, true),
  StructField("month", StringType, true),
  StructField("duration", IntegerType, true),
  StructField("campaign", IntegerType, true),
  StructField("pdays", IntegerType, true),
  StructField("previous", IntegerType, true),
  StructField("poutcome", StringType, true),
  StructField("y", StringType, true)))


 val df = sqlContext.
  read.
  schema(bankSchema).
  option("header", "true").
  option("delimiter", ";").
  load("/user/amit.kudnaver_gmail/hadoop/project_bank/Project_Bank.csv").toDF()

  df.registerTempTable("people")
  df.printSchema()
  val distinctage = sqlContext.sql("select distinct age from people")

任何关于为什么在推送正确的模式后无法在此处使用 csv 文件的建议。提前感谢您的建议。

谢谢阿米特 K

4

2 回答 2

4

这里的问题是数据框在处理它时需要 Parquet 文件。为了处理 CSV 中的数据。在这里你可以做什么。

首先,从数据中删除标题行。

58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no
44;technician;single;secondary;no;29;yes;no;unknown;5;may;151;1;-1;0;unknown;no
33;entrepreneur;married;secondary;no;2;yes;yes;unknown;5;may;76;1;-1;0;unknown;no

接下来我们编写以下代码来读取数据。

创建案例类

case class BankSchema(age: Int, job: String, marital:String, education:String, default:String, balance:Int, housing:String, loan:String, contact:String, day:Int, month:String, duration:Int, campaign:Int, pdays:Int, previous:Int, poutcome:String, y:String)

从 HDFS 读取数据并解析

val bankData = sc.textFile("/user/myuser/Project_Bank.csv").map(_.split(";")).map(p => BankSchema(p(0).toInt, p(1), p(2),p(3),p(4), p(5).toInt, p(6), p(7), p(8), p(9).toInt, p(10), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toInt, p(15), p(16))).toDF()

然后注册表并执行查询。

bankData.registerTempTable("bankData")
val distinctage = sqlContext.sql("select distinct age from bankData")

这是输出的样子

+---+
|age|
+---+
| 33|
| 44|
| 58|
+---+
于 2017-05-23T08:23:09.273 回答
4

这里预期的文件格式是csv,但根据错误它正在寻找parquet文件格式。

这可以通过明确提及以下文件格式(共享问题中缺少)来克服,因为如果我们不指定文件格式,则默认情况下它需要Parquet格式。

根据 Java 代码版本(示例):

Dataset<Row> resultData = session.read().format("csv")
                                            .option("sep", ",")
                                            .option("header", true)
                                            .option("mode", "DROPMALFORMED")
                                            .schema(definedSchema)
                                            .load(inputPath);

在这里,可以使用 ajava class (ie. POJO class)StructType前面提到的 using 来定义模式。inputPath 是输入csv文件的路径。

于 2018-09-20T03:16:52.657 回答