如果我理解,第一步是计算直方图:
[attr, value] => frequency
其中是列中frequency
出现的次数。value
attr
下一步是取直方图和原始数据,对每一行计算AVF,并对其进行排序。
我将分两遍进行:一次 map-reduce 遍历计算直方图,第二次 mr 遍历使用直方图查找 AVF。我还会使用单个常量哈希无罪,因为将直方图值和单元格值获取到同一位置将是一个混乱的野兽。(例如,让 map1[attr val id]
使用[attr val]
as 键发出;让 reduce1 累积每个键的所有记录,对它们进行计数,然后发出[id attr val count]
。第二遍使用id
as 键重新组合,然后平均每一行)。
要计算直方图,将中间步骤视为“组”而不是“排序”会有所帮助。方法如下:由于 reduce 输入是按键排序的,让它累积给定键的所有记录,并且一旦它看到不同的键,就发出计数。Wukong,ruby 中的小飞象,有一个Accumulator
,我猜小飞象也有。(有关工作代码,请参见下文)。
这给你留下了
attr1 val1a frequency
attr1 val1b frequency
attr2 val2a frequency
...
attrN attrNz frequency
对于下一次,我会将这些数据加载到一个哈希表中——如果它适合内存,则为简单的Hash
( dictionary
),如果不适合,则为快速键值存储——并像你拥有它一样计算每条记录的 AVF。
这是计算 avf 的工作 ruby 代码;见http://github.com/mrflip/wukong/blob/master/examples/stats/avg_value_frequency.rb
第一关
module AverageValueFrequency
# Names for each column's attribute, in order
ATTR_NAMES = %w[length width height]
class HistogramMapper < Wukong::Streamer::RecordStreamer
def process id, *values
ATTR_NAMES.zip(values).each{|attr, val| yield [attr, val] }
end
end
#
# For an accumulator, you define a key that is used to group records
#
# The Accumulator calls #start! on the first record for that group,
# then calls #accumulate on all records (including the first).
# Finally, it calls #finalize to emit a result for the group.
#
class HistogramReducer < Wukong::Streamer::AccumulatingReducer
attr_accessor :count
# use the attr and val as the key
def get_key attr, val, *_
[attr, val]
end
# start the sum with 0 for each key
def start! *_
self.count = 0
end
# ... and count the number of records for this key
def accumulate *_
self.count += 1
end
# emit [attr, val, count]
def finalize
yield [key, count].flatten
end
end
end
Wukong::Script.new(AverageValueFrequency::HistogramMapper, AverageValueFrequency::HistogramReducer).run
第二遍
module AverageValueFrequency
class AvfRecordMapper < Wukong::Streamer::RecordStreamer
# average the frequency of each value
def process id, *values
sum = 0.0
ATTR_NAMES.zip(values).each do |attr, val|
sum += histogram[ [attr, val] ].to_i
end
avf = sum / ATTR_NAMES.length.to_f
yield [id, avf, *values]
end
# Load the histogram from a tab-separated file with
# attr val freq
def histogram
return @histogram if @histogram
@histogram = { }
File.open(options[:histogram_file]).each do |line|
attr, val, freq = line.chomp.split("\t")
@histogram[ [attr, val] ] = freq
end
@histogram
end
end
end