2

考虑以下日志文​​件格式:

id        v1        v2        v3
1         15        30        25
2         10        10        20
3         50        30        30

我们将使用 dumbo 计算 Hadoop 集群上每个数据行的平均值频率 (AVF) 。具有 m 个属性的数据点的 AVF 定义为:

avf = (1/m)* sum (frequencies of attributes 1..m)

所以对于第一行,avf = (1/3)*(1+2+1) ~= 1.33。异常值由低 AVF 识别。

编程问题

我们有以下伪/python代码:

H = {}  # stores attribute frequencies

map1(_, datapoint): # 
  for attr in datapoint.attrs:
    yield (attr, 1)

reduce1(attr, values):
  H[attr] = sum(values)

map2(_, datapoint):
  sum = 0
  m = len(datapoint.attrs)
  for attr in datapoint.attrs:
    sum += H[attr]        

  yield (1/m)*sum, datapoint

reduce2(avf, datapoints): # identity reducer, only sorts datapoints on avf
  yield avf, datapoints

问题是,我们如何将我们的数据点集插入map1map2,以及使用Hmap2 中的中间散列。如上所述H进行全局定义似乎违背了 MapReduce 概念。

4

1 回答 1

0

如果我理解,第一步是计算直方图:

[attr, value] => frequency

其中是列中frequency出现的次数。valueattr

下一步是取直方图和原始数据,对每一行计算AVF,并对其进行排序。

我将分两遍进行:一次 map-reduce 遍历计算直方图,第二次 mr 遍历使用直方图查找 AVF。我还会使用单个常量哈希无罪,因为将直方图值和单元格值获取到同一位置将是一个混乱的野兽。(例如,让 map1[attr val id]使用[attr val]as 键发出;让 reduce1 累积每个键的所有记录,对它们进行计数,然后发出[id attr val count]。第二遍使用idas 键重新组合,然后平均每一行)。


要计算直方图,将中间步骤视为“组”而不是“排序”会有所帮助。方法如下:由于 reduce 输入是按键排序的,让它累积给定键的所有记录,并且一旦它看到不同的键,就发出计数。Wukong,ruby 中的小飞象,有一个Accumulator,我猜小飞象也有。(有关工作代码,请参见下文)。

这给你留下了

attr1    val1a      frequency
attr1    val1b      frequency
attr2    val2a      frequency
...
attrN    attrNz     frequency

对于下一次,我会将这些数据加载到一个哈希表中——如果它适合内存,则为简单的Hash( dictionary),如果不适合,则为快速键值存储——并像你拥有它一样计算每条记录的 AVF。


这是计算 avf 的工作 ruby​​ 代码;见http://github.com/mrflip/wukong/blob/master/examples/stats/avg_value_frequency.rb

第一关

module AverageValueFrequency
  # Names for each column's attribute, in order
  ATTR_NAMES = %w[length width height]

  class HistogramMapper < Wukong::Streamer::RecordStreamer
    def process id, *values
      ATTR_NAMES.zip(values).each{|attr, val| yield [attr, val] }
    end
  end

  #
  # For an accumulator, you define a key that is used to group records
  #
  # The Accumulator calls #start! on the first record for that group,
  # then calls #accumulate on all records (including the first).
  # Finally, it calls #finalize to emit a result for the group.
  #
  class HistogramReducer < Wukong::Streamer::AccumulatingReducer
    attr_accessor :count

    # use the attr and val as the key
    def get_key attr, val, *_
      [attr, val]
    end

    # start the sum with 0 for each key
    def start! *_
      self.count = 0
    end
    # ... and count the number of records for this key
    def accumulate *_
      self.count += 1
    end
    # emit [attr, val, count]
    def finalize
      yield [key, count].flatten
    end
  end
end

Wukong::Script.new(AverageValueFrequency::HistogramMapper, AverageValueFrequency::HistogramReducer).run

第二遍

module AverageValueFrequency
  class AvfRecordMapper < Wukong::Streamer::RecordStreamer
    # average the frequency of each value
    def process id, *values
      sum = 0.0
      ATTR_NAMES.zip(values).each do |attr, val|
        sum += histogram[ [attr, val] ].to_i
      end
      avf = sum / ATTR_NAMES.length.to_f
      yield [id, avf, *values]
    end

    # Load the histogram from a tab-separated file with
    #   attr    val   freq
    def histogram
      return @histogram if @histogram
      @histogram = { }
      File.open(options[:histogram_file]).each do |line|
        attr, val, freq = line.chomp.split("\t")
        @histogram[ [attr, val] ] = freq
      end
      @histogram
    end
  end
end
于 2010-02-03T02:40:53.003 回答