我是 scala(2.11) 和 spark (1.6.0) 的新程序员,他试图在没有 spark-csv 包的情况下将 RDD 转换为 DF(用于练习,但也因为一些技术问题)。在阅读了 Spark 的入门指南和 stackoverflow 的所有相关帖子之后,我不知道如何使一些方法(4)起作用——只有一个对我有用,我不知道为什么——:
对他们中的任何一个的每一次帮助都将是惊人的!
我有一个简单的表格,例如 txt 文件:
Jorgito 10 1 Soltero
Juanito 20 2 Casado
Jaimito 30 3 Divociado
我编写了一些初步代码:
var RDD_filas = RDD_datos.map(_.split("\t"))
var esquema = new StructType()
.add("Nombre", StringType)
.add("Edad", IntegerType)
.add("Hijos",IntegerType)
.add("EC",StringType)
import org.apache.spark.sql._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.spark.sql.SQLContext
case class X(Nombre: String, Edad: Int, Hijos: Int, EC: String)
然后,我应用了我见过的所有不起作用的方法:
var DF_datos = RDD_filas.map({case Array(s0, s1, s2, s3) => X(s0, s1.trim.toInt, s2.trim.toInt, s3)}).toDF("Nombre","Edad","Hijos","EC")
var DF_datos2 = RDD_filas.map(p => X(p(0), p(1).trim.toInt,p(2).trim.toInt,p(3))).toDF("Nombre","Edad","Hijos","EC")
var DF_datos3 = RDD_filas.map(Array(s0, s1, s2, s3) => Array(s0, s1.trim.toInt, s2.trim.toInt, s3)).toDF("Nombre","Edad","Hijos","EC")
var DF_datos4 = sqlContext.createDataFrame(RDD_filas,esquema)
前三种方法允许我创建 DF 并打印它们的模式,但它们没有标题(DF_datos.header() 返回第一行),如果我尝试 DF_datos.show() 会出错一个(对我来说)是第 4 位,因为它应该是最“规范”的方式。
只有这对我有用:
var a = RDD_datos.map(_.split(" ")).take(3)
val rdd = sc.makeRDD(a)
val df = rdd.map {case Array(s0, s1, s2, s3) => X(s0, s1.toInt, s2.toInt, s3)}.toDF()