我一直在尝试编写一些代码来使用 MapReduce 查找数字的平均值。
我正在尝试使用全局计数器来达到我的目标,但我无法在map
我的 Mapper 方法中设置计数器值,我也无法在reduce
我的 Reducer 方法中检索计数器值。
我是否必须使用全局计数器map
(例如使用incrCounter(key, amount)
提供的Reporter
)?或者你会建议任何不同的逻辑来获得一些数字的平均值吗?
我一直在尝试编写一些代码来使用 MapReduce 查找数字的平均值。
我正在尝试使用全局计数器来达到我的目标,但我无法在map
我的 Mapper 方法中设置计数器值,我也无法在reduce
我的 Reducer 方法中检索计数器值。
我是否必须使用全局计数器map
(例如使用incrCounter(key, amount)
提供的Reporter
)?或者你会建议任何不同的逻辑来获得一些数字的平均值吗?
逻辑很简单:如果所有数字都具有相同的 key,那么映射器会发送您想要使用相同 key 求平均值的所有值。因此,在 reducer 中,您可以对迭代器中的值求和。然后,您可以在迭代器工作的时间上保留一个计数器,这解决了要平均多少项目的问题。最后,在迭代器之后,您可以通过将总和除以项目数来找到平均值。
请注意,如果将组合器类设置为与减速器相同的类,则此逻辑将不起作用...
使用所有 3 Mapper/Combiner/Reducer 来解决问题。请参阅以下链接以获取完整的代码和说明
http://alchemistviews.blogspot.com/2013/08/calculate-average-in-map-reduce-using.html
平均值是总和/大小。如果 sum 类似于 sum = k1 + k2 + k3 + ... ,则您可以在汇总之后或期间除以大小。所以平均值也是k1 / size + k2 / size + k3 / size + ...
Java 8 代码很简单:
public double average(List<Valuable> list) {
final int size = list.size();
return list
.stream()
.mapToDouble(element->element.someValue())
.reduce(0,(sum,x)->sum+x/size);
}
因此,您首先将列表中元素的每个值映射为 double,然后通过 reduce 函数求和。
算术平均值是一个聚合函数,它不是分布的,而是代数的。根据韩等人的说法。聚合函数是分布的,如果:
[...] 可以如下计算 [...]。假设 [..] 数据被划分为n 个集合。我们将函数应用于每个分区,得到n 个聚合值。如果将函数应用于n个聚合值得到的结果与将函数应用于整个数据集(没有分区)得到的结果相同,则可以以分布式方式计算函数。
或者换句话说,它必须是关联的和可交换的。然而,根据Han 等人的说法,聚合函数是代数的。如果:
[...] 它可以通过具有 m 个参数(其中 m 是有界正整数)的代数函数来计算,每个参数都是通过应用分布聚合函数获得的。
对于算术平均值,这只是avg = sum/count。显然你需要额外携带一个计数。但是为此使用全局计数器似乎是一种误用。API描述org.apache.hadoop.mapreduce.Counter
如下:
跟踪 map/reduce 作业进度的命名计数器。
无论如何,计数器通常应该用于有关作业的统计信息,而不是作为数据处理本身期间计算的一部分。
因此,您在分区中要做的一切就是将您的数字相加并与总和(sum, count)一起跟踪它们的计数;一个简单的方法可能是一个字符串,如<sum><separator><count>
.
在映射器中,计数始终为 1,总和就是原始值本身。要减少地图文件,您可以使用组合器并处理聚合,如(sum_1 + ... + sum_n, count_1 + ... + count_n)。这必须在 reducer 中重复并由最终计算sum/count完成。请记住,这种方法独立于使用的密钥!
最后,这是一个使用LAPD原始犯罪统计数据的简单示例,它应该计算洛杉矶的“平均犯罪时间”:
public class Driver extends Configured implements Tool {
enum Counters {
DISCARDED_ENTRY
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Driver(), args);
}
public int run(String[] args) throws Exception {
Configuration configuration = getConf();
Job job = Job.getInstance(configuration);
job.setJarByClass(Driver.class);
job.setMapperClass(Mapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(Combiner.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : -1;
}
}
public class Mapper extends org.apache.hadoop.mapreduce.Mapper<
LongWritable,
Text,
LongWritable,
Text
> {
@Override
protected void map(
LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<
LongWritable,
Text,
LongWritable,
Text
>.Context context
) throws IOException, InterruptedException {
// parse the CSV line
ArrayList<String> values = this.parse(value.toString());
// validate the parsed values
if (this.isValid(values)) {
// fetch the third and the fourth column
String time = values.get(3);
String year = values.get(2)
.substring(values.get(2).length() - 4);
// convert time to minutes (e.g. 1542 -> 942)
int minutes = Integer.parseInt(time.substring(0, 2))
* 60 + Integer.parseInt(time.substring(2,4));
// create the aggregate atom (a/n)
// with a = time in minutes and n = 1
context.write(
new LongWritable(Integer.parseInt(year)),
new Text(Integer.toString(minutes) + ":1")
);
} else {
// invalid line format, so we increment a counter
context.getCounter(Driver.Counters.DISCARDED_ENTRY)
.increment(1);
}
}
protected boolean isValid(ArrayList<String> values) {
return values.size() > 3
&& values.get(2).length() == 10
&& values.get(3).length() == 4;
}
protected ArrayList<String> parse(String line) {
ArrayList<String> values = new ArrayList<>();
String current = "";
boolean escaping = false;
for (int i = 0; i < line.length(); i++){
char c = line.charAt(i);
if (c == '"') {
escaping = !escaping;
} else if (c == ',' && !escaping) {
values.add(current);
current = "";
} else {
current += c;
}
}
values.add(current);
return values;
}
}
public class Combiner extends org.apache.hadoop.mapreduce.Reducer<
LongWritable,
Text,
LongWritable,
Text
> {
@Override
protected void reduce(
LongWritable key,
Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
Long n = 0l;
Long a = 0l;
Iterator<Text> iterator = values.iterator();
// calculate intermediate aggregates
while (iterator.hasNext()) {
String[] atom = iterator.next().toString().split(":");
a += Long.parseLong(atom[0]);
n += Long.parseLong(atom[1]);
}
context.write(key, new Text(Long.toString(a) + ":" + Long.toString(n)));
}
}
public class Reducer extends org.apache.hadoop.mapreduce.Reducer<
LongWritable,
Text,
LongWritable,
Text
> {
@Override
protected void reduce(
LongWritable key,
Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
Long n = 0l;
Long a = 0l;
Iterator<Text> iterator = values.iterator();
// calculate the finale aggregate
while (iterator.hasNext()) {
String[] atom = iterator.next().toString().split(":");
a += Long.parseLong(atom[0]);
n += Long.parseLong(atom[1]);
}
// cut of seconds
int average = Math.round(a / n);
// convert the average minutes back to time
context.write(
key,
new Text(
Integer.toString(average / 60)
+ ":" + Integer.toString(average % 60)
)
);
}
}