0

在做了一些操作之后,我得到了一个数组(任意)的 rdd(如下一个),其中所有值都是Int 类型,期望 3,8 和 13是字符串类型。

Array[Array[Any]] = Array(Array(1, 2, 3, 4, 5), Array(6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15))

使用以下代码供您参考:

var exp = sc.parallelize(Array(Array(1,2,"3",4,5),Array(6,7,"8",9,10),Array(11,12,"13",14,15)))

现在我正在尝试使用案例类从这个数组创建一个数据框,其中列名和案例类如下:

case class specialchar(alpha:Int,beta:Int,gamma:String,theta:Int,zeta:Int) 

我需要帮助我们如何遍历 Array[Array[Any]] 的 rdd 并存储在数据框中。提前致谢。

4

1 回答 1

0

Udf 的处理Any

def toInt(x: Any): Option[Int] = x match {
  case i: Int => Some(i)
  case _ => None
}

def toStr(x: Any): Option[String] = x match {
  case i: String => Some(i)
  case _ => None
}

案例类并将 Array 转换为 Df。

var exp = sc.parallelize(Array(Array(1,2,"3",4,5),Array(6,7,"8",9,10),Array(11,12,"13",14,15)))
case class specialchar(alpha:Int,beta:Int,gamma:String,theta:Int,zeta:Int)  

var specialCharDf = Seq.empty[specialchar].toDF

exp.collect().foreach(x => {
    val a:Int = toInt(x(0)).getOrElse(1)
    val b:Int = toInt(x(1)).getOrElse(1)
    val c:String = toStr(x(2)).getOrElse("1")
    val d:Int = toInt(x(3)).getOrElse(1)
    val e:Int = toInt(x(4)).getOrElse(1)

    println(a, b, c, d, e)

    val specialcharTempDf =  Seq(specialchar(a,b,c,d,e)).toDF
    specialCharDf = specialcharTempDf.union(specialCharDf)
})

specialCharDf.printSchema() //follows schema desired.

编辑编辑编辑——akhil 提到最后,它们都应该是整数。新的解决方案如下:

    var exp = sc.parallelize(Array(Array(1,2,"3",4,5),Array(6,7,"8",9,10),Array(11,12,"13",14,15)))
    case class specialchar(alpha:Int,beta:Int,gamma:Int,theta:Int,zeta:Int)  

    var specialCharDf = Seq.empty[specialchar].toDF

exp.collect().foreach(x => {
    val a:Int = toInt(x(0)).getOrElse(1)
    val b:Int = toInt(x(1)).getOrElse(1)
    val c:String = toStr(x(2)).getOrElse("1")
    val f = c.toInt
    val d:Int = toInt(x(3)).getOrElse(1)
    val e:Int = toInt(x(4)).getOrElse(1)

    println(a, b, f, d, e)

    val specialcharTempDf =  Seq(specialchar(a,b,f,d,e)).toDF
    specialCharDf = specialcharTempDf.union(specialCharDf)
})

specialCharDf.printSchema() //follows schema desired.
于 2019-06-19T14:38:06.553 回答