0

无论如何,每个减速器进程可以确定它必须处理的元素或记录的数量吗?

4

2 回答 2

3

简短的回答 - 提前不,reducer 不知道迭代器支持多少值。您可以做到这一点的唯一方法是在迭代时进行计数,但是您不能再次对可迭代对象进行重新迭代。

长答案 - 支持可迭代实际上是序列化键/值对的排序字节数组。reducer 有两个比较器 - 一个用于按键顺序对键/值对进行排序,第二个用于确定键之间的边界(称为键分组器)。通常,键分组器与键排序比较器相同。

当迭代特定键的值时,底层上下文检查数组中的下一个键,并使用分组比较器与前一个键进行比较。如果比较器确定它们相等,则迭代继续。否则,此特定键的迭代结束。因此,您可以看到您无法提前确定为任何特定键传递的值。

如果您创建一个复合键,例如 Text/IntWritable 对,您实际上可以看到这一点。对于 compareTo 方法,首先按 Text 排序,然后按 IntWritable 字段排序。接下来创建一个比较器用作组比较器,它只考虑键的文本部分。现在,当您迭代 reducer 中的值时,您应该能够观察到键的 IntWritable 部分随每次迭代而变化。

我之前用来演示这种情况的一些代码可以在这个pastebin上找到

于 2012-12-20T01:15:00.090 回答
1

您的 reducer 类必须扩展 MapReducer Reduce 类:

Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

然后必须使用扩展 Reduce 类中指定的 KEYIN/VALUEIN 参数来实现 reduce 方法

reduce(KEYIN key, Iterable<VALUEIN> values, org.apache.hadoop.mapreduce.Reducer.Context context)

与给定键关联的值可以通过

int count = 0;
Iterator<VALUEIN> it = values.iterator();
while(it.hasNext()){
  it.Next();
  count++;
}

尽管我建议在您的其他处理的同时进行此计数,以免两次通过您的值集。

编辑

这是一个向量的示例向量,当您添加到它时会动态增长(因此您不必静态声明数组,因此不需要设置值的大小)。这对于非常规数据最有效(即输入 csv 文件中每一行的列数都不相同),但开销最大。

Vector table = new Vector();

Iterator<Text> it = values.iterator();
while(it.hasNext()){

  Text t = it.Next();
  String[] cols = t.toString().split(",");   

  int i = 0;
  Vector row = new Vector(); //new vector will be our row
  while(StringUtils.isNotEmpty(cols[i])){
    row.addElement(cols[i++]); //here were adding a new column for every value in the csv row
  }

  table.addElement(row);
}

然后您可以通过访问第 N 行的第 M 列

table.get(N).get(M);

现在,如果您知道将设置列数,您可以修改它以使用数组向量,这可能会更快/更节省空间。

于 2012-12-20T00:13:33.333 回答