9

我想编写一个 map/reduce 作业,以根据行级条件从大型数据集中选择多个随机样本。我想尽量减少中间键的数量。

伪代码:

for each row 
  if row matches condition
    put the row.id in the bucket if the bucket is not already large enough

你做过这样的事情吗?有没有众所周知的算法?

包含连续行的样本也足够好。

谢谢。

4

3 回答 3

12

映射器:输出所有符合条件的值,每个值都有一个随机整数键。

单个reducer:输出前N个值,丢弃key。

排序器将为您随机化映射器输出顺序。您不知道映射器会找到多少个限定值,因此每个映射器必须从其分区中输出所有限定值。

一般来说,我喜欢构建像这样的简单映射器/缩减器工具,它们尽可能多地使用 Hadoop 机器;我最终在不同的任务中重用它们。

于 2010-03-25T19:12:08.450 回答
8

Karl 的方法工作得很好,但我们可以大大减少映射器产生的数据量。

K为您想要的样本数。我们假设它足够小,可以保存在您的一个节点上的内存中。我们将为每个匹配的行分配一个随机值,然后使用选择算法的修改来找到K个最小值。

在每个映射器的设置部分,创建一个优先队列斐波那契堆是一个不错的选择。我们将使用浮点数作为优先级;如果您有大量数据,双打可能更适合避免出现联系。对于与您的条件匹配的每一行,将该行插入优先级队列,随机选择一个介于 0 和 1 之间的浮点数作为优先级。如果队列中有超过K个东西,请删除价值最高的项目(这与标准斐波那契堆的术语相反)。

最后,在映射器的末尾,发出队列中的所有内容。对于您发出的每个项目,使用作为 a 的优先级作为键,FloatWritable并使用相应行的某些表示作为值(行 ID,或者可能是整个行的内容)。每个映射器只发出K个值(如果该映射器中匹配的行少于K个,则发出更少的值)。

在你的单个 reducer 中,Hadoop 将自动按从低到高的顺序扫描键。发出与您看到的前K个键对应的行( K最低),然后退出。

这是有效的,因为每个匹配行具有相同的概率具有K个最小浮点值之一。我们跟踪每个映射器的K个最小浮点数,以确保我们不会遗漏任何一个,然后将它们发送到 reducer 以找到整体上最小的K个浮点数。

于 2010-04-06T12:31:49.620 回答
2

Bkkbrad 的方法可能是最有效的,因为从每个映射器发出的记录数(最多)为 K。另一方面,请注意它假设样本本身(即 K 个元素)适合单个减速器。

如果不是这种情况,您可能会倾向于简单地使用完全分布式的方法,其中每个匹配行由映射器分配 {1,..,K} 中的随机整数,然后 reduce 阶段为每个键选择一个元素(另见这个问题)。但是,这种方法的问题在于,可能偶然没有将行分配给某些键,在这种情况下,最终样本的元素将少于 K。即使如果 K 远小于总行数 N 时这种情况发生的概率很小,但如果 K 是 N 的恒定分数(例如当 K=N/3 时),它也会以恒定概率发生。

一个可行的解决方案如下:假设我们有 B 个桶,并首先通过将每个元素放入一个随机桶中,然后在每个桶中生成一个随机排序来生成元素的随机排序。第一个桶中的元素被认为比第二个桶中的元素小(相对于排序),依此类推。然后,如果我们想选择一个大小为 K 的样本,我们可以收集前 j 个桶中的所有元素,如果它们总体上包含的元素数量 t 小于 K,然后从下一个桶中选择剩余的 Kt 个元素。这里 B 是一个参数,使得 N/B 个元素适合内存。关键方面是桶可以并行处理。

Mapper:输出所有符合条件的行,每行都有一个随机键 (j, r),其中 j 是 {1,..,B} 中的随机整数,r 是随机浮点数。此外,跟踪 key 小于 j 的元素的数量(对于 1<=j<=B)并将此信息传输到 reducer。

Shuffle:对 j 进行分区,对 r 进行二次排序。

Reducer:考虑桶 j 并假设 reducer 知道桶中有多少元素小于 j 以及桶 j 中有多少(通过聚合映射器接收到的信息)。如果桶中小于或等于j的元素个数小于或等于K,则输出桶j中的所有元素;如果桶中严格小于 j 的元素数量为 t < K,则运行水库抽样以从桶中选择 K-t 个随机元素;在剩下的情况下,即当桶中小于 j 的元素数量至少为 K 时,不输出任何内容。

我不知道这个问题的更简单的解决方案,但如果有一个会很好。

您可以在我的博客上找到更多详细信息。

于 2013-08-18T01:47:07.187 回答