无论如何,每个减速器进程可以确定它必须处理的元素或记录的数量吗?
2 回答
简短的回答 - 提前不,reducer 不知道迭代器支持多少值。您可以做到这一点的唯一方法是在迭代时进行计数,但是您不能再次对可迭代对象进行重新迭代。
长答案 - 支持可迭代实际上是序列化键/值对的排序字节数组。reducer 有两个比较器 - 一个用于按键顺序对键/值对进行排序,第二个用于确定键之间的边界(称为键分组器)。通常,键分组器与键排序比较器相同。
当迭代特定键的值时,底层上下文检查数组中的下一个键,并使用分组比较器与前一个键进行比较。如果比较器确定它们相等,则迭代继续。否则,此特定键的迭代结束。因此,您可以看到您无法提前确定为任何特定键传递的值。
如果您创建一个复合键,例如 Text/IntWritable 对,您实际上可以看到这一点。对于 compareTo 方法,首先按 Text 排序,然后按 IntWritable 字段排序。接下来创建一个比较器用作组比较器,它只考虑键的文本部分。现在,当您迭代 reducer 中的值时,您应该能够观察到键的 IntWritable 部分随每次迭代而变化。
我之前用来演示这种情况的一些代码可以在这个pastebin上找到
您的 reducer 类必须扩展 MapReducer Reduce 类:
Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
然后必须使用扩展 Reduce 类中指定的 KEYIN/VALUEIN 参数来实现 reduce 方法
reduce(KEYIN key, Iterable<VALUEIN> values,
org.apache.hadoop.mapreduce.Reducer.Context context)
与给定键关联的值可以通过
int count = 0;
Iterator<VALUEIN> it = values.iterator();
while(it.hasNext()){
it.Next();
count++;
}
尽管我建议在您的其他处理的同时进行此计数,以免两次通过您的值集。
编辑
这是一个向量的示例向量,当您添加到它时会动态增长(因此您不必静态声明数组,因此不需要设置值的大小)。这对于非常规数据最有效(即输入 csv 文件中每一行的列数都不相同),但开销最大。
Vector table = new Vector();
Iterator<Text> it = values.iterator();
while(it.hasNext()){
Text t = it.Next();
String[] cols = t.toString().split(",");
int i = 0;
Vector row = new Vector(); //new vector will be our row
while(StringUtils.isNotEmpty(cols[i])){
row.addElement(cols[i++]); //here were adding a new column for every value in the csv row
}
table.addElement(row);
}
然后您可以通过访问第 N 行的第 M 列
table.get(N).get(M);
现在,如果您知道将设置列数,您可以修改它以使用数组向量,这可能会更快/更节省空间。