3

unpivot当列数相当低并且列名可以硬编码时,我已经看到了一些火花数据帧的解决方案。您是否有一个可扩展的解决方案来取消旋转具有大量列的数据框?

下面是一个玩具问题。

输入:

  val df = Seq(
    (1,1,1,0),
    (2,0,0,1)    
  ).toDF("ID","A","B","C")

+---+--------+----+
| ID|  A | B | C  |
+---+--------+-----
|  1|  1 | 1 | 0  |
|  2|  0 | 0 | 1  |
+---+----------+--+

预期结果:

+---+-----+-----+
| ID|names|count|
+---+-----------|
|  1|  A  |  1  |
|  1|  B  |  1  |
|  1|  C  |  0  |
|  2|  A  |  0  |
|  2|  B  |  0  |
|  2|  C  |  1  |
+---+-----------+

该解决方案应该适用于具有 N 列要取消透视的数据集,其中 N 很大(例如 100 列)。

4

2 回答 2

8

这应该可行,我假设您知道要取消透视的列列表

import org.apache.spark.sql.{functions => func, _}

val df = Seq(
    (1,1,1,0),
    (2,0,0,1)    
  ).toDF("ID","A","B","C")

val cols = Seq("A", "B", "C")

df.select(
    $"ID",
    func.explode(
        func.array(
            cols.map(
                col =>
                    func.struct(    
                        func.lit(col).alias("names"),
                        func.col(col).alias("count")
                    )
            ): _*
        )
    ).alias("v")
)
.selectExpr("ID", "v.*")
于 2020-02-13T15:49:55.043 回答
4

这可以在纯 spark Sql 中通过堆叠列来完成。

下面是 pyspark 中的一个示例,可以很容易地适应 Scala。python代码仅与基于相关字段动态构造Sql相关。我经常使用这种方法。

from pyspark.sql.types import * 

df = spark.createDataFrame([
  {"id" : 1, "A" : 1, "B" : 1, "C" : 0},
  {"id" : 2, "A" : 0, "B" : 0, "C" : 1}
],
StructType([StructField("id", IntegerType()), StructField("A", IntegerType()),StructField("B", IntegerType()) , StructField("C", IntegerType())]))

def features_to_eav(df, subset=None):

  relevant_columns = subset if subset else df.columns
  n = len(relevant_columns)
  cols_to_stack = ", ".join(['\'{c}\', {c}'.format(c=c) for c in relevant_columns]) 
  stack_expression = "stack({}, {}) as (name, value)".format(n, cols_to_stack)

  fetures_to_check_df = df.select(*(["id"] + relevant_columns)).createOrReplaceTempView("features_to_check")

  sql = "select id, {} from features_to_check".format(stack_expression)
  print ("Stacking sql:", sql)
  return spark.sql(sql)

features_to_eav(df, ["A", "B", "C"]).show()

输出(注意构造的sql):

Stacking sql: select id, stack(3, 'A', A, 'B', B, 'C', C) as (name, value) from features_to_check
+---+----+-----+
| id|name|value|
+---+----+-----+
|  1|   A|    1|
|  1|   B|    1|
|  1|   C|    0|
|  2|   A|    0|
|  2|   B|    0|
|  2|   C|    1|
+---+----+-----+
于 2020-02-13T16:16:12.840 回答