这是我制定的一个算法:
示例问题
假设我们想从 3 个分区上的 RDD 中采样 10 个项目,如下所示:
- P1:(“A”,0.10),(“B”,0.10),(“C”,0.20)
- P2:(“D”:0.25),(“E”,0.25)
- P3:(“F”,0.10)
这是高级算法:
输入: number of samples
和一个RDD of items (with weights)
输出: dataset sample
在驱动器上
- 对于每个分区,计算从分区采样的总概率,并将这些值聚合到驱动程序。
- 这将给出概率分布:
Prob(P1) = 0.40, Prob(P2) = 0.50, Prob(P3) = 0.10
- 生成分区样本(以确定要从每个分区中选择的元素数量。)
- 示例可能如下所示:
[P1, P1, P1, P1, P2, P2, P2, P2, P2, P3]
- 这将为我们提供来自 P1 的 4 个项目、来自 P2 的 5 个项目和来自 P3 的 1 个项目。
- 在每个单独的分区上,我们仅使用该分区上的元素在本地生成所需大小的样本:
- 在 P1 上,我们将采样 4 个具有(重新归一化)概率分布的项目:
Prob(A) = 0.25, Prob(B) = 0.25, Prob(C) = 0.50
. 这可能会产生一个样本,例如[A, B, C, C]
.
- 在 P2 上,我们将采样 5 个具有概率分布的项目:
Prob(D) = 0.5, Prob(E) = 0.5
。这可能会产生一个样本,例如[D,D,E,E,E]
- 在 P3:样本 1 具有概率分布的项目:
P(F) = 1.0
,这将生成样本[E]
Collect
样本到驱动程序以产生您的数据集样本[A,B,C,C,D,D,E,E,E,F]
。
这是scala中的一个实现:
case class Sample[T](weight: Double, obj: T)
/*
* Obtain a sample of size `numSamples` from an RDD `ar` using a two-phase distributed sampling approach.
*/
def sampleWeightedRDD[T:ClassTag](ar: RDD[Sample[T]], numSamples: Int)(implicit sc: SparkContext): Array[T] = {
// 1. Get total weight on each partition
var partitionWeights = ar.mapPartitionsWithIndex{case(partitionIndex, iter) => Array((partitionIndex, iter.map(_.weight).sum)).toIterator }.collect().toArray
//Normalize to 1.0
val Z = partitionWeights.map(_._2).sum
partitionWeights = partitionWeights.map{case(partitionIndex, weight) => (partitionIndex, weight/Z)}
// 2. Sample from partitions indexes to determine number of samples from each partition
val samplesPerIndex = sc.broadcast(sample[Int](partitionWeights, numSamples).groupBy(x => x).mapValues(_.size).toMap).value
// 3. On each partition, sample the number of elements needed for that partition
ar.mapPartitionsWithIndex{case(partitionIndex, iter) =>
val numSamplesForPartition = samplesPerIndex.getOrElse(partitionIndex, 0)
var ar = iter.map(x => (x.obj, x.weight)).toArray
//Normalize to 1.0
val Z = ar.map(x => x._2).sum
ar = ar.map{case(obj, weight) => (obj, weight/Z)}
sample(ar, numSamplesForPartition).toIterator
}.collect()
}
此代码使用简单的加权采样函数sample
:
// a very simple weighted sampling function
def sample[T:ClassTag](dist: Array[(T, Double)], numSamples: Int): Array[T] = {
val probs = dist.zipWithIndex.map{case((elem,prob),idx) => (elem,prob,idx+1)}.sortBy(-_._2)
val cumulativeDist = probs.map(_._2).scanLeft(0.0)(_+_).drop(1)
(1 to numSamples).toArray.map(x => scala.util.Random.nextDouble).map{case(p) =>
def findElem(p: Double, cumulativeDist: Array[Double]): Int = {
for(i <- (0 until cumulativeDist.size-1))
if (p <= cumulativeDist(i)) return i
return cumulativeDist.size-1
}
probs(findElem(p, cumulativeDist))._1
}
}