0

我有一个 pyspark 函数,但需要将其转换为 Scala

PySpark

for i in [c for c in r.columns if c.startswith("_")]:
            r = r.withColumn(i, F.col(i)["id"])

由于 scala 数据类型是不可变的,因此 Scala 有什么更好的方法可以让我创建多个新列,而无需 val df1 = df.withcolumn, val df2 = df1.withcolumn 就像我在 pyspark 中所做的那样?

表r如下

+-----------+-------------+-------------+-------------+-------------+
|         _0|           _1|           _2|           _3|           _4|
+-----------+-------------+-------------+-------------+-------------+
|[1, Carter]|   [5, Banks]|[11, Derrick]|    [4, Hood]|    [12, Jef]|
|[1, Carter]|    [12, Jef]|    [4, Hood]|   [5, Banks]|[11, Derrick]|
|[1, Carter]|    [4, Hood]|    [12, Jef]|[11, Derrick]|   [5, Banks]|
|[1, Carter]|    [12, Jef]|   [5, Banks]|[11, Derrick]|    [4, Hood]|
|[1, Carter]|    [4, Hood]|    [12, Jef]|   [5, Banks]|[11, Derrick]|
|[1, Carter]|[11, Derrick]|    [12, Jef]|    [4, Hood]|   [5, Banks]|
|[1, Carter]|    [12, Jef]|[11, Derrick]|   [5, Banks]|    [4, Hood]|
|[1, Carter]|   [5, Banks]|    [4, Hood]|[11, Derrick]|    [12, Jef]|
|[1, Carter]|[11, Derrick]|   [5, Banks]|    [4, Hood]|    [12, Jef]|
|[1, Carter]|   [5, Banks]|[11, Derrick]|    [12, Jef]|    [4, Hood]|
|[1, Carter]|   [5, Banks]|    [12, Jef]|[11, Derrick]|    [4, Hood]|
|[1, Carter]|   [5, Banks]|    [12, Jef]|    [4, Hood]|[11, Derrick]|
|[1, Carter]|[11, Derrick]|   [5, Banks]|    [12, Jef]|    [4, Hood]|
|[1, Carter]|    [4, Hood]|[11, Derrick]|   [5, Banks]|    [12, Jef]|
|[1, Carter]|[11, Derrick]|    [4, Hood]|   [5, Banks]|    [12, Jef]|
|[1, Carter]|    [12, Jef]|   [5, Banks]|    [4, Hood]|[11, Derrick]|
|[1, Carter]|    [12, Jef]|[11, Derrick]|    [4, Hood]|   [5, Banks]|
|[1, Carter]|    [4, Hood]|[11, Derrick]|    [12, Jef]|   [5, Banks]|
|[1, Carter]|[11, Derrick]|    [4, Hood]|    [12, Jef]|   [5, Banks]|
|[1, Carter]|    [12, Jef]|    [4, Hood]|[11, Derrick]|   [5, Banks]|
+-----------+-------------+-------------+-------------+-------------+
4

2 回答 2

1

您可以使用 foldLeft


import org.apache.spark.sql.functions.{col}

val updDf = df
      .columns
      .filter(_.startsWith("_"))
      .foldLeft(df)((df, c) => df.withColumn(s"new_$c", col(c).getItem("id")))

于 2021-07-14T12:48:00.723 回答
0

您可以通过一次选择来完成(每个都.withColumn创建一个新的数据集来解析)

// either replace with the internal id column, or take as is
val updates = r.columns.map(c => if (c.startsWith("_")) col(s"$c.id") as c else col(c))

val newDf = r.select(updates:_*)  // _* expands the Sequence into a parameter list

于 2021-07-14T12:44:46.807 回答