给定一个 Spark 数据框,我想根据该列的非缺失值和非未知值计算列均值。然后我想取这个平均值并用它来替换列的缺失和未知值。
例如,假设我正在使用:
- 名为 df 的数据框,其中每条记录代表一个人,所有列都是整数或数字
- 名为年龄的列(每条记录的年龄)
- 名为 missing_age 的列(如果该人没有年龄,则等于 1,否则为 0)
- 名为 unknown_age 的列(如果该个人年龄未知,则等于 1,否则为 0)
然后我可以计算这个平均值,如下所示。
calc_mean = df.where((col("unknown_age") == 0) & (col("missing_age") == 0))
.agg(avg(col("age")))
或通过 SQL 和 windows 函数,
mean_compute = hiveContext.sql("select avg(age) over() as mean from df
where missing_age = 0 and unknown_age = 0")
如果可以的话,我不想使用 SQL/Windows函数。我的挑战是采用这种平均值并使用非 SQL 方法用它替换未知/缺失值。
我尝试过使用 when()、where()、replace()、withColumn、UDF 和组合...无论我做什么,我要么得到错误,要么结果不是我所期望的。这是我尝试过但不起作用的许多事情之一的示例。
imputed = df.when((col("unknown_age") == 1) | (col("missing_age") == 1),
calc_mean).otherwise("age")
我已经搜索了网络,但没有找到类似的插补类型问题,因此非常感谢任何帮助。这可能是我错过的非常简单的事情。
附注 - 我正在尝试将此代码应用于 Spark Dataframe 中列名中没有 unknown_ 或 missing_ 的所有列。我可以将 Spark 相关代码包装在 Python 的“for 循环”中并遍历所有适用的列来执行此操作吗?
更新:
还想出了如何遍历列...这是一个示例。
for x in df.columns:
if 'unknown_' not in x and 'missing_' not in x:
avg_compute = df.where(df['missing_' + x] != 1).agg(avg(x)).first()[0]
df = df.withColumn(x + 'mean_miss_imp', when((df['missing_' + x] == 1),
avg_compute).otherwise(df[x]))