7

I have a bunch of tuples which are in form of composite keys and values. For example,

tfile.collect() = [(('id1','pd1','t1'),5.0), 
     (('id2','pd2','t2'),6.0),
     (('id1','pd1','t2'),7.5),
     (('id1','pd1','t3'),8.1)  ]

I want to perform sql like operations on this collection, where I can aggregate the information based on id[1..n] or pd[1..n] . I want to implement using the vanilla pyspark apis and not using SQLContext. In my current implementation I am reading from a bunch of files and merging the RDD.

def readfile():
    fr = range(6,23)
    tfile = sc.union([sc.textFile(basepath+str(f)+".txt")
                        .map(lambda view: set_feature(view,f)) 
                        .reduceByKey(lambda a, b: a+b)
                        for f in fr])
    return tfile

I intend to create an aggregated array as a value. For example,

agg_tfile = [((id1,pd1),[5.0,7.5,8.1])]

where 5.0,7.5,8.1 represent [t1,t2,t3] . I am currently, achieving the same by vanilla python code using dictionaries. It works fine for smaller data sets. But I worry as this may not scale for larger data sets. Is there an efficient way achieving the same using pyspark apis ?

4

2 回答 2

13

我的猜测是您想根据多个字段转置数据。

一种简单的方法是将要分组的目标字段连接起来,并使其成为配对 RDD 中的键。例如:

lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1'])
rdd = lines.map(lambda x: x.split(',')).map(lambda x: (x[0] + ', ' + x[1], x[3])).reduceByKey(lambda a, b: a + ', ' + b)
print rdd.collect()

然后你会得到转置的结果。

[('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')]
于 2015-04-08T13:10:25.063 回答
2

我将 ((id1,t1),((p1,5.0),(p2,6.0)) 等...分组为我的地图函数。后来,我使用 map_group 进行缩减,它为 [p1,p2, . . . . ] 并在各自的位置填写值。

def map_group(pgroup):
    x = np.zeros(19)
    x[0] = 1
    value_list = pgroup[1]
    for val in value_list:
        fno = val[0].split('.')[0]
        x[int(fno)-5] = val[1]
    return x

tgbr = tfile.map(lambda d: ((d[0][0],d[0][2]),[(d[0][1],d[1])])) \
                .reduceByKey(lambda p,q:p+q) \
                .map(lambda d: (d[0], map_group(d)))

就计算而言,这确实是一个昂贵的解决方案。但暂时有效。

于 2015-04-08T20:14:58.337 回答