0

在 PySpark 中,我有一个由 (key;value) 对组成的 RDD,其中key是顺序整数,value是浮点数。

我想从这个 RDD 中准确地采样一个元素,概率与value成正比。

以一种简单的方式,这个任务可以完成如下:

pairs = myRDD.collect()   #now pairs is a list of (key;value) tuples
K, V = zip(*pairs)        #separate keys and values
V = numpy.array(V)/sum(V) #normalise probabilities
extractedK = numpy.random.choice(K,size=1,replace=True, p=V)

正如您可能知道的那样,我关心的是collect()在内存中加载整个元组列表的操作,这可能非常昂贵。我知道takeSample(),当应该统一提取元素时,这很好,但是如果应该根据加权概率提取元素会发生什么?

谢谢!

4

2 回答 2

3

这是我制定的一个算法:

示例问题

假设我们想从 3 个分区上的 RDD 中采样 10 个项目,如下所示:

  • P1:(“A”,0.10),(“B”,0.10),(“C”,0.20)
  • P2:(“D”:0.25),(“E”,0.25)
  • P3:(“F”,0.10)

这是高级算法:

输入: number of samples和一个RDD of items (with weights)

输出: dataset sample在驱动器上

  1. 对于每个分区,计算从分区采样的总概率,并将这些值聚合到驱动程序。
    • 这将给出概率分布:Prob(P1) = 0.40, Prob(P2) = 0.50, Prob(P3) = 0.10
  2. 生成分区样本(以确定要从每个分区中选择的元素数量。)
    • 示例可能如下所示:[P1, P1, P1, P1, P2, P2, P2, P2, P2, P3]
    • 这将为我们提供来自 P1 的 4 个项目、来自 P2 的 5 个项目和来自 P3 的 1 个项目。
  3. 在每个单独的分区上,我们仅使用该分区上的元素在本地生成所需大小的样本:
    • 在 P1 上,我们将采样 4 个具有(重新归一化)概率分布的项目:Prob(A) = 0.25, Prob(B) = 0.25, Prob(C) = 0.50. 这可能会产生一个样本,例如[A, B, C, C].
    • 在 P2 上,我们将采样 5 个具有概率分布的项目:Prob(D) = 0.5, Prob(E) = 0.5。这可能会产生一个样本,例如[D,D,E,E,E]
    • 在 P3:样本 1 具有概率分布的项目:P(F) = 1.0,这将生成样本[E]
  4. Collect样本到驱动程序以产生您的数据集样本[A,B,C,C,D,D,E,E,E,F]

这是scala中的一个实现:

  case class Sample[T](weight: Double, obj: T)

  /*
   *  Obtain a sample of size `numSamples` from an RDD `ar` using a two-phase distributed sampling approach.
   */
  def sampleWeightedRDD[T:ClassTag](ar: RDD[Sample[T]], numSamples: Int)(implicit sc: SparkContext): Array[T] = {
    // 1. Get total weight on each partition
    var partitionWeights = ar.mapPartitionsWithIndex{case(partitionIndex, iter) => Array((partitionIndex, iter.map(_.weight).sum)).toIterator }.collect().toArray

    //Normalize to 1.0
    val Z = partitionWeights.map(_._2).sum
    partitionWeights = partitionWeights.map{case(partitionIndex, weight) => (partitionIndex, weight/Z)}

    // 2. Sample from partitions indexes to determine number of samples from each partition
    val samplesPerIndex = sc.broadcast(sample[Int](partitionWeights, numSamples).groupBy(x => x).mapValues(_.size).toMap).value

    // 3. On each partition, sample the number of elements needed for that partition
    ar.mapPartitionsWithIndex{case(partitionIndex, iter) => 
      val numSamplesForPartition = samplesPerIndex.getOrElse(partitionIndex, 0)
      var ar = iter.map(x => (x.obj, x.weight)).toArray

      //Normalize to 1.0
      val Z = ar.map(x => x._2).sum
      ar = ar.map{case(obj, weight) => (obj, weight/Z)}
      sample(ar, numSamplesForPartition).toIterator
    }.collect()
  }

此代码使用简单的加权采样函数sample

 // a very simple weighted sampling function 
  def sample[T:ClassTag](dist: Array[(T, Double)], numSamples: Int): Array[T] = {

    val probs = dist.zipWithIndex.map{case((elem,prob),idx) => (elem,prob,idx+1)}.sortBy(-_._2)
    val cumulativeDist = probs.map(_._2).scanLeft(0.0)(_+_).drop(1)
    (1 to numSamples).toArray.map(x => scala.util.Random.nextDouble).map{case(p) => 

      def findElem(p: Double, cumulativeDist: Array[Double]): Int = {
        for(i <- (0 until cumulativeDist.size-1)) 
          if (p <= cumulativeDist(i)) return i
        return cumulativeDist.size-1
      }

      probs(findElem(p, cumulativeDist))._1
    }
  }
于 2017-08-15T01:51:24.370 回答
1

这基本上是可行的,但你应该真正考虑使用它是否有意义Spark。如果您需要绘制随机值,那么您可能需要在循环中重复多次。每次迭代都需要扫描所有数据(可能不止一次)。

因此,将您需要的数据装入内存,然后从中随机抽取值几乎可以肯定是正确的方法。如果您的数据确实太大而无法放入内存,请考虑 (a) 仅收集为此目的所需的列,以及 (b) 您的数据是否可以以有意义的方式进行分箱。

话虽如此,它可行的Spark。下面是pysaprk演示这个想法的代码。

import random
import pyspark.sql.functions as F
from pyspark.sql.window import Window
# read some sample data (shown below)
df = spark.read.csv("prb.csv",sep='\t',inferSchema=True,header=True)
# find the sum of the value column
ss = df.groupBy().agg( F.sum("vl").alias("sum") ).collect()
# add a column to store the normalized values
q = df.withColumn("nrm_vl", (df["vl"] / ss[0].sum) )
w = Window.partitionBy().orderBy("nrm_vl")\
          .rowsBetween(Window.unboundedPreceding, Window.currentRow)
q = q.select("*", F.sum("nrm_vl").over(w).alias("cum_vl"))
q.show()
+---+---+-------------------+-------------------+
| ky| vl|             nrm_vl|             cum_vl|
+---+---+-------------------+-------------------+
|  2|0.8|0.07079646017699115|0.07079646017699115|
|  3|1.1|0.09734513274336283|0.16814159292035397|
|  4|1.7|0.15044247787610618| 0.3185840707964601|
|  0|3.2| 0.2831858407079646| 0.6017699115044247|
|  1|4.5| 0.3982300884955752| 0.9999999999999999|
+---+---+-------------------+-------------------+

def getRandVl(q):
    # choose a random number and find the row that is
    # less than and nearest to the random number
    # (analog to `std::lower_bound` in C++)
    chvl = q.where( q["cum_vl"] > random.random() ).groupBy().agg(
        F.min(q["cum_vl"]).alias("cum_vl") )
    return q.join(chvl, on="cum_vl", how="inner")
# get 30 random samples.. this is already slow
# on a single machine.
for i in range(0,30):
    x = getRandVl(q)
    # add this row. there's no reason to do this (it's slow)
    # except that it's convenient to count how often each
    # key was chosen, to check if this method works
    cdf = cdf.select(cdf.columns).union(x.select(cdf.columns))

# count how often we picked each key
cdf.groupBy("ky","vl").agg( F.count("*").alias("count") ).show()
+---+---+-----+                                                                 
| ky| vl|count|
+---+---+-----+
|  4|1.7|    4|
|  2|0.8|    1|
|  3|1.1|    3|
|  0|3.2|   11|
|  1|4.5|   12|
+---+---+-----+

考虑到这些值,我认为这些计数是合理的。我宁愿用更多的样本来测试它,但它太慢了。

于 2017-06-04T21:02:32.427 回答