1
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc , 10)
rdd = ssc.sparkContext.parallelize(pd_binance)
rdd.take(1)

这是结果的一小部分:

[['0.02703300', '1.30900000'],
   ['0.02703300', '0.61800000'],
   ['0.02704600', '3.90800000'],
   ['0.02704700', '4.00000000'],
   ['0.02704700', '7.44600000']

我想获得每个键的最大值和最小值,如何?

4

2 回答 2

2

正如@mck 所说,您可以使用 reduceByKey 但如果您从未使用过函数式编程,理解起来可能会有点复杂。

该方法的作用是将函数应用于执行 a 的结果值groupByKey。让我们一步一步来分析。

>>> rdd.groupByKey().take(1)
[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f1fd90>)]

这样做我们获得了一个 RDD,每个键都有一个条目(配对 RDD 中的第一列),并且值是可迭代的。我们可以将其视为一个列表。

我们从基础 RDD 得到

[['0.02703300', '1.30900000'],
   ['0.02703300', '0.61800000'],
   ['0.02704600', '3.90800000'],
   ['0.02704700', '4.00000000'],
   ['0.02704700', '7.44600000']]

对一组

[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2fe20>),
 ('0.02704700', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f910>), 
 ('0.02703300', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f550>)]

然后我们必须做的是在值上应用所需的函数。我们可以将所需的函数传递给mapValues方法(在我的例子中,我直接传递一个 lambda 函数)

>>> rdd.groupByKey().mapValues(lambda k: (max(k), min(k))).collect()
[('0.02704600', ('3.90800000', '3.90800000')), 
('0.02704700', ('7.44600000', '4.00000000')), 
('0.02703300', ('1.30900000', '0.61800000'))]

有一些考虑:

  1. reducebyKey更加整洁和高效。虽然可能会令人困惑
  2. 如果您想要最大值和最小值,请尝试在我展示的同时执行此操作(您也可以使用 reduceByKey 执行此操作)。这样,您只需执行一次即可,而不是对数据进行两次传递。
  3. 尝试使用 DataFrame (SQL) API。它更现代,它会尝试为您优化计算。
  4. reduceByKey函数需要有点不同,因为它得到两个项目而不是一个可迭代的
>>> rdd.reduceByKey(lambda a, b: (max(a,b), min(a, b))).collect()
[('0.02704600', '3.90800000'), 
('0.02704700', ('7.44600000', '4.00000000')), 
('0.02703300', ('1.30900000', '0.61800000'))]
于 2021-01-02T21:32:28.083 回答
2

您可以使用reduceByKey

minimum = rdd.reduceByKey(min)
maximum = rdd.reduceByKey(max)
于 2021-01-02T15:42:41.190 回答