0

我正在尝试使用combineByKey为我的分配找到每个键的中值(使用combineByKey是分配的要求),并且我计划使用以下函数返回与同一键关联的所有值列表的对(k, v)v = a之后,我计划对值进行排序,然后找到中位数。

data = sc.parallelize([('A',2), ('A',4), ('A',9), ('A',3), ('B',10), ('B',20)])

rdd = data.combineByKey(lambda value: value, lambda c, v: median1(c,v), lambda c1, c2: median2(c1,c2))

def median1 (c,v):
    list = [c]
    list.append(v)
    return list

def median2 (c1,c2):
    list2 = [c1]
    list2.append(c2)
    return list2

但是,我的代码给出的输出如下:

[('A', [[2, [4, 9]], 3]), ('B', [10, 20])]

其中 value 是一个嵌套列表。无论如何我可以取消 pyspark 中的值以获得

[('A', [2, 4, 9, 3]), ('B', [10, 20])]

还是有其他方法可以找到每个键的中位数combineByKey?谢谢!

4

2 回答 2

0

collect_list在数据框列上使用起来更容易。

from pyspark.sql.functions import collect_list

df = rdd.toDF(['key', 'values'])

key_lists = df.groupBy('key').agg(collect_list('values').alias('value_list'))
于 2018-06-11T17:06:10.910 回答
0

你只是没有从价值中得到一个好的组合器。

这是你的答案:

data = sc.parallelize([('A',2), ('A',4), ('A',9), ('A',3), ('B',10), ('B',20)])

def createCombiner(value):
    return [value]
def mergeValue(c, value):
    return c.append(value)
def mergeCombiners(c1, c2):
    return c1+c2

rdd = data.combineByKey(createCombiner, mergeValue, mergeCombiners)

[('A', [9, 4, 2, 3]), ('B', [10, 20])]

于 2018-06-11T10:13:40.003 回答