4

我有一个 Spark 数据框,其中包含Date,GroupPrice列。

我正在尝试在 Python 中导出该数据框percentile(0.6)的列。Price此外,我需要将输出添加为新列。

我尝试了下面的代码:

perudf = udf(lambda x: x.quantile(.6))
df1 = df.withColumn("Percentile", df.groupBy("group").agg("group"),perudf('price'))

但它抛出以下错误:

assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"
AssertionError: all exprs should be Column
4

3 回答 3

4

您可以使用 sql 使用“percentile_approx”。在 pyspark 中创建 UDF 很困难。

有关其他详细信息,请参阅此链接:https ://mail-archives.apache.org/mod_mbox/spark-user/201510.mbox/%3CCALte62wQV68D6J87EVq6AD5-T3D0F3fHjuzs+1C5aCHOUUGS8w@mail.gmail.com%3E

于 2016-05-03T15:04:42.073 回答
3

您可以使用窗口函数,只需定义一个聚合窗口(您的案例中的所有数据),然后按百分位值过滤:

from pyspark.sql.window import Window
from pyspark.sql.functions import percent_rank

w =  Window.orderBy(df.price)
df.select('price', percent_rank().over(w).alias("percentile"))\
    .where('percentile == 0.6').show()

percent_rankpyspark.sql.functions中可用

如果您愿意,可以使用此databricks 帖子中的 SQL 接口

于 2017-01-25T14:56:20.467 回答
2

我知道使用 RDD 获取每一行的百分位数的解决方案。首先,将您的 RDD 转换为 DataFrame:

# convert to rdd of dicts
rdd = df.rdd
rdd = rdd.map(lambda x: x.asDict())

然后,您可以计算每一行的百分位数:

column_to_decile = 'price'
total_num_rows = rdd.count()


def add_to_dict(_dict, key, value):
    _dict[key] = value
    return _dict


def get_percentile(x, total_num_rows):
    _dict, row_number = x
    percentile = x[1] / float(total_num_rows)
    return add_to_dict(_dict, "percentile", percentile)


rdd_percentile = rdd.map(lambda d: (d[column_to_decile], d)) # make column_to_decile a key
rdd_percentile = rdd_percentile.sortByKey(ascending=False) # so 1st decile has largest
rdd_percentile = rdd_percentile.map(lambda x: x[1]) # remove key
rdd_percentile = rdd_percentile.zipWithIndex() # append row number
rdd_percentile = rdd_percentile.map(lambda x: get_percentile(x, total_num_rows))

最后,转换回 DataFrame:

df = sqlContext.createDataFrame(rdd_percentile)

要获得与 0.6 最接近的百分位数的行,您可以执行以下操作:

from pyspark.sql.types import *
from pyspark.sql.functions import udf


def get_row_with_percentile(df, percentile):
    func = udf(lambda x: abs(x), DoubleType())
    df_distance = df.withColumn("distance", func(df['percentile'] - percentile))
    min_distance = df_distance.groupBy().min('distance').collect()[0]['min(distance)']
    result = df_distance.filter(df_distance['distance'] == min_distance)
    result.drop("distance")
    return result


get_row_with_percentile(df, 0.6).show()
于 2016-05-03T22:00:22.543 回答