11

I'm trying to create a Row (org.apache.spark.sql.catalyst.expressions.Row) based on the user input. I'm not able to create a Row randomly.

Is there any functionality to create a Row from List or Array.

For eg., If I have a .csv file with the following format,

"91xxxxxxxxxx,21.31,15,0,0"

If the user input [1, 2] then I need to take only 2nd column and 3rd column along with the customer_id which is the first column

I try to parse it with the code:

val l3 = sc.textFile("/SparkTest/abc.csv").map(_.split(" ")).map(r => (foo(input,r(0)))) `

where foo is defined as

def f(n: List[Int], s: String) : Row = {
    val n = input.length
    var out = new Array[Any](n+1)
    var r = s.split(",")
    out(0) = r(0)
    for (i <- 1 to n)
        out(i) = r(input(i-1)).toDouble
    Row(out)
}

and input is a List say

val input = List(1,2)

Executing this code I get l3 as:

Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@234d2916])

But what I want is:

Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([9xxxxxxxxxx,21.31,15])`

This has to be passed to create a schema in Spark SQL

4

3 回答 3

18

像下面这样的东西应该可以工作:

import org.apache.spark.sql._

def f(n: List[Int], s: String) : Row =
  Row.fromSeq(s.split(",").zipWithIndex.collect{case (a,b) if n.contains(b) => a}.toSeq)
于 2015-01-23T14:26:38.097 回答
2

您缺少 StructField 和 StructType 的创建。参考官方指南http://spark.apache.org/docs/latest/sql-programming-guide.html,部分Programmatically Specifying Schema

我不是 Scala 专家,但在 Python 中它看起来像这样:

from pyspark.sql import *
sqlContext = SQLContext(sc)

input = [1,2]

def parse(line):
    global input
    l = line.split(',')
    res = [l[0]]
    for ind in input:
        res.append(l[ind])
    return res

csv  = sc.textFile("file:///tmp/inputfile.csv")
rows = csv.map(lambda x: parse(x))

fieldnum = len(input) + 1
fields = [StructField("col"+str(i), StringType(), True) for i in range(fieldnum)]
schema = StructType(fields)

csvWithSchema = sqlContext.applySchema(rows, schema)
csvWithSchema.registerTempTable("test")
sqlContext.sql("SELECT * FROM test").collect()

简而言之,您不应该直接将它们转换为 Row 对象,只需保留为 RDD 并将模式应用于它applySchema

于 2015-01-23T14:32:30.260 回答
1

你也可以试试:

    Row.fromSeq(line(0).toString ++ line(1).toDouble ++ line(2).toDouble ++ line.slice(2, line.size).map(value => value.toString))
于 2017-11-18T11:31:18.330 回答