3

这个问题讨论了如何链接自定义 PySpark 2 转换。

DataFrame #transform 方法已添加到 PySpark 3 API。

此代码片段显示了一个不带参数且按预期工作的自定义转换,以及另一个带参数但不工作的自定义转换。

from pyspark.sql.functions import col, lit

df = spark.createDataFrame([(1, 1.0), (2, 2.)], ["int", "float"])

def with_funny(word):
    def inner(df):
        return df.withColumn("funny", lit(word))
    return inner

def cast_all_to_int(input_df):
    return input_df.select([col(col_name).cast("int") for col_name in input_df.columns])

df.transform(with_funny("bumfuzzle")).transform(cast_all_to_int).show()

这是输出的内容:

+---+-----+-----+
|int|float|funny|
+---+-----+-----+
|  1|    1| null|
|  2|    2| null|
+---+-----+-----+

应该如何定义该with_funny()方法以输出 PySpark 3 API 的值?

4

1 回答 1

4

如果我理解,您的第一个转换方法将添加一个带有作为参数传递的文字的新列,最后一个转换将所有列转换为 int 类型,对吗?

将字符串转换为 int 将返回 null 值,您的最​​终输出是正确的:

from pyspark.sql.functions import col, lit

df = spark.createDataFrame([(1, 1.0), (2, 2.)], ["int", "float"])

def with_funny(word):
    def inner(df):
        return df.withColumn("funny", lit(word))
    return inner

def cast_all_to_int(input_df):
    return input_df.select([col(col_name).cast("int") for col_name in input_df.columns])

#first transform
df1 = df.transform(with_funny("bumfuzzle"))
df1.show()

#second transform
df2 = df1.transform(cast_all_to_int)
df2.show()

#all together
df_final = df.transform(with_funny("bumfuzzle")).transform(cast_all_to_int)
df_final.show()

输出:

+---+-----+---------+
|int|float|    funny|
+---+-----+---------+
|  1|  1.0|bumfuzzle|
|  2|  2.0|bumfuzzle|
+---+-----+---------+

+---+-----+-----+
|int|float|funny|
+---+-----+-----+
|  1|    1| null|
|  2|    2| null|
+---+-----+-----+

+---+-----+-----+
|int|float|funny|
+---+-----+-----+
|  1|    1| null|
|  2|    2| null|
+---+-----+-----+

也许您想要的是像这样切换转换的顺序:

df_final = df.transform(cast_all_to_int).transform(with_funny("bumfuzzle"))
df_final.show()

输出:

+---+-----+---------+
|int|float|    funny|
+---+-----+---------+
|  1|    1|bumfuzzle|
|  2|    2|bumfuzzle|
+---+-----+---------+
于 2020-08-27T08:12:08.520 回答