91

Looking at the new spark DataFrame API, it is unclear whether it is possible to modify dataframe columns.

How would I go about changing a value in row x column y of a dataframe?

In pandas this would be:

df.ix[x,y] = new_value

Edit: Consolidating what was said below, you can't modify the existing dataframe as it is immutable, but you can return a new dataframe with the desired modifications.

If you just want to replace a value in a column based on a condition, like np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

If you want to perform some operation on a column and create a new column that is added to the dataframe:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

If you want the new column to have the same name as the old column, you could add the additional step:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
4

5 回答 5

77

虽然您不能这样修改列,但您可以对列进行操作并返回反映该更改的新 DataFrame。为此,您首先要创建一个UserDefinedFunction实现要应用的操作,然后选择性地将该函数仅应用于目标列。在 Python 中:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_dfold_df现在具有与(假设old_df.target_column也是类型)相同的架构,StringType但列中的所有值target_column都是new_value.

于 2015-03-25T13:35:02.113 回答
56

通常在更新列时,我们希望将旧值映射到新值。这是在没有 UDF 的 pyspark 中执行此操作的一种方法:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).
于 2015-12-21T22:23:26.957 回答
14

DataFrames基于RDD。RDD 是不可变结构,不允许现场更新元素。要更改值,您需要通过使用类似 SQL 的 DSL 或 RDD 操作(如map.

强烈推荐的幻灯片:Introducing DataFrames in Spark for Large Scale Data Science

于 2015-03-17T21:51:45.187 回答
12

正如maasg所说,您可以从应用于旧 DataFrame 的映射结果创建一个新的 DataFrame。df具有两行的给定 DataFrame 的示例:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

请注意,如果列的类型发生更改,则需要为其提供正确的架构而不是df.schema. 查看org.apache.spark.sql.Row可用方法的 api:https ://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[更新] 或者在 Scala 中使用 UDF:

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

如果列名需要保持不变,您可以将其重命名:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
于 2015-11-08T21:19:36.520 回答
5

pyspark.sql.functions导入col并根据字符串(字符串 a,字符串 b,字符串 c)将第五列更新为整数(0,1,2)到新的 DataFrame 中。

from pyspark.sql.functions import col, when 

data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))
于 2020-05-26T15:59:15.750 回答