-1

我有一个使用PySpark创建的RDD ,在通过键值加入后大小约为600 GB,看起来就像这样。

[('43.72_-70.08', (('0744632', -70.08, 43.72, '2.4'), '18090865')),
 ('43.72_-70.08', (('0744632', -70.08, 43.72, '2.4'), '18090865')),
 ('43.25_-67.58', (('0753877', -67.58, 43.25, '7.2'), '18050868')),
 ('43.01_-75.24', (('0750567', -75.24, 43.01, '7.2'), '18042872'))]

我想要这样的东西并按第一个元素排序:

['0744632', '18090865', '2.4',
'0744632', '18090865', '2.4',
'0750567', '18042872', '7.2',
'0753877', '18050868', '7.2']

有没有办法从元组中获取数据并以所需格式获取输出。

注意:这是一个 600 GB 的 RDD,第一列有超过一百万个不同的值,大约。150 亿行,如果可能的话,我真的很感激一种优化的方式。

4

3 回答 3

0

在您的火花集群中执行此操作,例如:

In []:
(rdd.map(lambda x: (x[1][0][0], x[1][1], x[1][0][2]))
 .sortBy(lambda x: x[0])
 .flatMap(lambda x: x)
 .collect())

Out[]:
['0744632', '18090865', 43.72, '0744632', '18090865', 43.72, '0750567', 
 '18042872', 43.01, '0753877', '18050868', 43.25]

或者

In []:
import operator as op

(rdd.map(lambda x: (x[1][0][0], x[1][1], x[1][0][2]))
 .sortBy(lambda x: x[0])
 .reduce(op.add))

Out[]:
('0744632', '18090865', 43.72, '0744632', '18090865', 43.72, '0750567', 
 '18042872', 43.01, '0753877', '18050868', 43.25)

这似乎是一个相当笨拙的结构,如果您的意思是一个元组列表,那么只需消除flatMap()

In []:
(rdd.map(lambda x: (x[1][0][0], x[1][1], x[1][0][2]))
 .sortBy(lambda x: x[0])
 .collect())

Out[]:
[('0744632', '18090865', 43.72),
 ('0744632', '18090865', 43.72),
 ('0750567', '18042872', 43.01),
 ('0753877', '18050868', 43.25)]
于 2018-04-27T03:01:14.313 回答
0

这是一个简单的单行解决方案

sorted([(x[1][0][0], x[1][1], x[1][0][3]) for x in your_list]) 

我认为它比基于这篇文章的 lambda 解决方案略快 这两种解决方案有什么区别 - lambda 或循环 - Python

于 2018-04-27T03:27:13.507 回答
0

与其他 Spark 答案类似:

rdd=rdd.map(lambda (a,(b,c)): [b[0], c, b[3]])\
       .sortBy(lambda row: row[0])

你也可以使用 reduce 代替 flatMap:

rdd.reduce(lambda x,y: x+y)
于 2018-04-27T08:33:43.887 回答