正如@mck 所说,您可以使用 reduceByKey 但如果您从未使用过函数式编程,理解起来可能会有点复杂。
该方法的作用是将函数应用于执行 a 的结果值groupByKey
。让我们一步一步来分析。
>>> rdd.groupByKey().take(1)
[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f1fd90>)]
这样做我们获得了一个 RDD,每个键都有一个条目(配对 RDD 中的第一列),并且值是可迭代的。我们可以将其视为一个列表。
我们从基础 RDD 得到
[['0.02703300', '1.30900000'],
['0.02703300', '0.61800000'],
['0.02704600', '3.90800000'],
['0.02704700', '4.00000000'],
['0.02704700', '7.44600000']]
对一组
[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2fe20>),
('0.02704700', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f910>),
('0.02703300', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f550>)]
然后我们必须做的是在值上应用所需的函数。我们可以将所需的函数传递给mapValues
方法(在我的例子中,我直接传递一个 lambda 函数)
>>> rdd.groupByKey().mapValues(lambda k: (max(k), min(k))).collect()
[('0.02704600', ('3.90800000', '3.90800000')),
('0.02704700', ('7.44600000', '4.00000000')),
('0.02703300', ('1.30900000', '0.61800000'))]
有一些考虑:
reducebyKey
更加整洁和高效。虽然可能会令人困惑
- 如果您想要最大值和最小值,请尝试在我展示的同时执行此操作(您也可以使用 reduceByKey 执行此操作)。这样,您只需执行一次即可,而不是对数据进行两次传递。
- 尝试使用 DataFrame (SQL) API。它更现代,它会尝试为您优化计算。
reduceByKey
函数需要有点不同,因为它得到两个项目而不是一个可迭代的
>>> rdd.reduceByKey(lambda a, b: (max(a,b), min(a, b))).collect()
[('0.02704600', '3.90800000'),
('0.02704700', ('7.44600000', '4.00000000')),
('0.02703300', ('1.30900000', '0.61800000'))]