16

有人可以举例说明地图减少中中位数/分位数的计算吗?

我对Datafu的中位数的理解是'n'个映射器对数据进行排序并将数据发送到“1”个reducer,该reducer负责对来自n个映射器的所有数据进行排序并找到中值(中间值)我的理解是否正确?,

如果是这样,这种方法是否适用于大量数据,因为我可以清楚地看到一个单一的减速器正在努力完成最后的任务。谢谢

4

4 回答 4

14

试图找到系列中的中位数(中间数)将需要 1 个 reducer 传递整个数字范围以确定哪个是“中间”值。

根据输入集中值的范围和唯一性,您可以引入组合器来输出每个值的频率 - 减少发送到单个减速器的地图输出数量。然后,您的减速器可以使用排序值/频率对来识别中位数。

另一种可以扩展的方法(如果您知道值的范围和粗略分布)是使用自定义分区器,该分区器按范围桶分配键(0-99 到减速器 0,100-199 到减速器 2,依此类推上)。然而,这将需要一些辅助工作来检查 reducer 输出并执行最终的中值计算(例如,知道每个 reducer 中的键数,您可以计算哪个 reducer 输出将包含中值以及偏移量)

于 2012-04-11T16:40:52.143 回答
7

你真的需要准确的中位数和分位数吗?

很多时候,您最好只获得近似值并使用它们,特别是如果您将其用于例如数据分区。

实际上,您可以使用近似分位数来加速找到确切的分位数(实际上是在O(n/p)时间上),这里是该策略的粗略概述:

  1. 让每个分区的映射器计算所需的分位数,并将它们输出到新的数据集。该数据集应该小几个数量级(除非您要求的分位数太多!)
  2. 在这个数据集中,再次计算分位数,类似于“中位数的中位数”。这些是您的初步估计。
  3. 根据这些分位数对数据进行重新分区(甚至以这种方式获得的附加分区)。目标是最终保证真正的分位数在一个分区中,并且每个分区最多应该有一个期望的分位数
  4. 在每个分区中,执行 QuickSelect (in O(n)) 以找到真正的分位数。

每个步骤都是线性时间。成本最高的步骤是第 3 部分,因为它需要重新分配整个数据集,因此会产生O(n)网络流量。您可以通过为第一次迭代选择“备用”分位数来优化该过程。说,你想找到全球中位数。您无法在线性过程中轻松找到它,但是当将其拆分为 k 个分区时,您可能可以将其缩小到数据集的 1/kth。因此,不是让每个节点报告其中值,而是让每个节点另外报告 (k-1)/(2k) 和 (k+1)/(2k) 处的对象。这应该允许您缩小真实中位数必须的值范围撒谎。因此在下一步中,您可以将每个节点在所需范围内的那些对象发送到单个主节点,并仅选择此范围内的中位数。

于 2012-09-08T14:38:09.507 回答
2

O((n log n)/p) 对它进行排序,然后 O(1) 得到中位数。

是的......你可以获得 O(n/p) 但你不能在 Hadoop 中使用开箱即用的排序功能。除非您能证明 2-20 小时的开发时间来编写并行的第 k 大算法,否则我只会排序并获得中心项目。

于 2012-04-12T18:22:11.077 回答
2

在许多实际场景中,数据集中值的基数相对较小。在这种情况下,可以通过两个 MapReduce 作业有效地解决问题:

  1. 计算数据集中值的频率(基本上是字数统计工作)
  2. 身份映射器 + 一个基于 <值 - 频率> 对计算中值的化简器

作业 1. 将大大减少数据量,并且可以完全并行执行。工作 2. 的 Reducer 只需要处理n( n= cardinality of your value set) 项而不是所有值,就像天真的方法一样。

下面是作业 2 的示例减速器。它是可以直接在 Hadoop 流中使用的 python 脚本。假设数据集中的值是ints,但可以很容易地用于doubles

import sys

item_to_index_range = []
total_count = 0

# Store in memory a mapping of a value to the range of indexes it has in a sorted list of all values
for line in sys.stdin:
    item, count = line.strip().split("\t", 1)
    new_total_count = total_count + int(count)
    item_to_index_range.append((item, (total_count + 1,   new_total_count + 1)))
    total_count = new_total_count

# Calculate index(es) of middle items
middle_items_indexes = [(total_count / 2) + 1]
if total_count % 2 == 0:
    middle_items_indexes += [total_count / 2]

# Retrieve middle item(s) 
middle_items = []
for i in middle_items_indexes:
    for item, index_range in item_to_index_range:
        if i in range(*index_range):
            middle_items.append(item)
            continue

print sum(middle_items) / float(len(middle_items))

这个答案建立在最初来自Chris White答案的建议之上。答案建议使用组合器作为计算值频率的平均值。但是,在 MapReduce 中,并不能保证始终执行组合器。这有一些副作用:

  • reducer 首先必须计算最终的 < 值 - 频率 > 对,然后计算中位数。
  • 在最坏的情况下,combiners 永远不会被执行,reducer 仍然必须努力处理所有单独的值
于 2016-07-22T17:28:00.817 回答