1

我在文本文件中有重复的列,当我尝试使用 spark scala 代码加载该文本文件时,它成功加载到数据框中,我可以通过 df.Show() 看到前 20 行

完整代码:-

 val sc = new SparkContext(conf)
 val hivesql = new org.apache.spark.sql.hive.HiveContext(sc)
 val rdd = sc.textFile("/...FilePath.../*")
 val fieldCount = rdd.map(_.split("[|]")).map(x => x.size).first()
 val field = rdd.zipWithIndex.filter(_._2==0).map(_._1).first()
 val fields = field.split("[|]").map(fieldName =>StructField(fieldName, StringType, nullable=true))
 val schema = StructType(fields)
 val rowRDD = rdd.map(_.split("[|]")).map(attributes => getARow(attributes,fieldCount))

val df = hivesql.createDataFrame(rowRDD, schema)
df.registerTempTable("Sample_File")
df.Show()

到目前为止,我的代码工作正常。但是一旦我尝试下面的代码,它就会给我错误。

val results = hivesql.sql("Select id,sequence,sequence from Sample_File")

所以我在文本文件中有 2 个具有相同名称的列,即序列如何访问这两个列.. 我尝试使用序列#2 但仍然无法工作 Spark 版本:-1.6.0 Scala 版本:- 2.10.5

result of df.printschema()
|-- id: string (nullable = true)
|-- sequence: string (nullable = true)
|-- sequence: string (nullable = true)
4

2 回答 2

0

以下代码可能会帮助您解决问题。我在 Spark 1.6.3 中对此进行了测试。

val sc = new SparkContext(conf)
val hivesql = new org.apache.spark.sql.hive.HiveContext(sc)
val rdd = sc.textFile("/...FilePath.../*")
val fieldCount = rdd.map(_.split("[|]")).map(x => x.size).first()
val field = rdd.zipWithIndex.filter(_._2==0).map(_._1).first()
val fields = field.split("[|]").map(fieldName =>StructField(fieldName, StringType, nullable=true))
val schema = StructType(fields)
val rowRDD = rdd.map(_.split("[|]")).map(attributes => getARow(attributes,fieldCount))

val df = hivesql.createDataFrame(rowRDD, schema)

val colNames = Seq("id","sequence1","sequence2")
val df1 = df.toDF(colNames: _*)

df1.registerTempTable("Sample_File")

val results = hivesql.sql("select id,sequence1,sequence2 from Sample_File")

于 2020-08-03T12:08:58.413 回答
0

我支持@smart_coder 的方法,不过我的方法略有不同。请在下面找到它。

您需要具有唯一的列名才能从 hivesql.sql 进行查询。

您可以使用以下代码动态重命名列名:

你的代码:

val df = hivesql.createDataFrame(rowRDD, schema)

在这一点之后,我们需要消除歧义,下面是解决方案:

var list = df.schema.map(_.name).toList

for(i <- 0 to list.size -1){
    val cont = list.count(_ == list(i))
    val col = list(i)
    
    if(cont != 1){
        list = list.take(i) ++ List(col+i) ++ list.drop(i+1)
    }
}

val df1 = df.toDF(list: _*)

// 你会得到如下输出: df1.printschema() 的结果

|-- id: string (nullable = true)
|-- sequence1: string (nullable = true)
|-- sequence: string (nullable = true)

所以基本上,我们将所有列名作为一个列表,然后检查是否有任何列重复多次,如果列重复,我们将在列名附加索引,然后我们创建一个新的数据框 d1具有重命名列名的新列表。

我已经在 Spark 2.4 中对此进行了测试,但它也应该在 1.6 中工作。

于 2020-08-03T13:40:08.403 回答