3

我对 Hadoop Map/Reduce 相当陌生。我正在尝试编写一个 Map/Reduce 作业来查找 n 个进程所花费的平均时间,给定如下输入文本文件:

ProcessName Time
process1    10
process2    20
processn    30

我经历了一些教程,但我仍然无法彻底理解。我的 mapper 和 reducer 类应该如何解决这个问题?我的输出将始终是文本文件,还是可以将平均值直接存储在某种变量中?

谢谢。

4

2 回答 2

3

您的映射器将您的输入映射到您想要取平均值的值。因此,假设您的输入是一个文本文件,格式如下

ProcessName Time
process1    10
process2    20
.
.
.

然后,您需要获取文件中的每一行,将其拆分,获取第二列,并将该列的值输出为IntWritable(或其他Writable数字类型)。由于您想取所有时间的平均值,而不是按进程名称或任何内容分组,因此您将拥有一个固定键。因此,您的映射器看起来像

private IntWritable one = new IntWritable(1);
private IntWritable output = new IntWritable();
proctected void map(LongWritable key, Text value, Context context) {
    String[] fields = value.split("\t");
    output.set(Integer.parseInt(fields[1]));
    context.write(one, output);
}

您的减速器采用这些值,并简单地计算平均值。这看起来像

IntWritable one = new IntWritable(1);
DoubleWritable average = new DoubleWritable();
protected void reduce(IntWritable key, Iterable<IntWrtiable> values, Context context) {
    int sum = 0;
    int count = 0;
    for(IntWritable value : values) {
        sum += value.get();
        count++;
    }
    average.set(sum / (double) count);
    context.Write(key, average);
}

我在这里做了很多假设,关于你的输入格式和什么不是,但它们是合理的假设,你应该能够调整它以满足你的确切需求。

我的输出将始终是文本文件,还是可以将平均值直接存储在某种变量中?

您在这里有几个选择。例如,您可以对作业的输出进行后处理(写入单个文件),或者,由于您正在计算单个值,因此您可以将结果存储在计数器中。

于 2013-08-05T16:07:20.620 回答
3

您的映射器读取文本文件并在每一行上应用以下映射函数

map: (key, value)
  time = value[2]
  emit("1", time)

所有 map 调用都会发出键“1”,该键将由一个 reduce 函数处理

reduce: (key, values)
  result = sum(values) / n
  emit("1", result)

由于您使用的是 Hadoop,您可能已经在 map 函数中看到了 StringTokenizer 的使用,您可以使用它来仅获取一行中的时间。您还可以考虑如何计算 n(进程数)的一些方法,例如,您可以在另一个只计算行数的作业中使用计数器。

更新
如果您要执行此作业,则必须将每一行的元组发送到减速器,如果您在多台机器上运行 Hadoop 集群,则可能会阻塞网络。一种更聪明的方法可以计算更接近输入的时间总和,例如通过指定一个组合器:

combine: (key, values)
  emit(key, sum(values))

然后在同一台机器的所有映射函数的结果上执行该组合器,即,两者之间没有网络。然后,reducer 将只获得与集群中的机器一样多的元组,而不是与日志文件中的行一样多。

于 2013-08-05T16:13:13.723 回答