3

为了处理我拥有的数据,我之前提取了模式,因此当我读取数据集时,我提供了模式,而不是通过推断模式的昂贵步骤。

为了构造模式,我需要将几个不同的模式合并到最终的模式中,所以我一直在使用union (++)anddistinct方法,但我一直收到org.apache.spark.sql.AnalysisException: Duplicate column(s)异常。

例如,假设我们在以下结构中有两个模式:

val schema1 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) :: Nil
    ), true) :: Nil)

val schema2 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) :: Nil
    ), true) :: Nil)

val schema3 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) ::
    StructField("ii", StringType, true) :: Nil
    ), true) :: Nil)

val final_schema = (schema1 ++ schema2 ++ schema3).distinct

println(final_schema)

输出:

StructType(
    StructField(A,StructType(
         StructField(i,StringType,true)),true), 
    StructField(A,StructType(
        StructField(i,StringType,true),    
        StructField(ii,StringType,true)),true))

我知道只有与另一个模式完全匹配的模式结构才会被distinct. 但是我希望结果看起来像这样:

StructType(
    StructField(A,StructType(
        StructField(i,StringType,true),    
        StructField(ii,StringType,true)),true))

其中所有的都被“组合”成一个模式。我已经筛选了scala 文档中的所有方法,但似乎找不到解决此问题的正确方法。有任何想法吗?

编辑:

最终目标是使用方法final_schema输入sqlContext.read.schema和读取 JSON 字符串的 RDD 。read

4

2 回答 2

0

尝试这样的事情:

(schema1 ++ schema2 ++ schema3).groupBy(getKey).map(_._2.head)

wheregetKey是一个从模式到要考虑合并的属性的函数(例如列名或子字段的名称)。在map函数中,您可以采取头部或使用一些更精细的函数来保持特定的模式。

于 2016-12-28T07:19:45.060 回答
0

Spark 与 Scala:

val consolidatedSchema = test1Df.schema.++:(test2Df.schema).toSet
val uniqueConsolidatedSchemas = StructType(consolidatedSchema.toSeq)

使用 Java 的 Spark:

StructType consolidatedSchema = test1Df.schema().merge(test2Df.schema());
于 2020-07-14T19:21:19.220 回答