我对 MapReduce 非常陌生,我完成了一个 Hadoop 字数统计示例。
在该示例中,它生成未排序的字数文件(带有键值对)。那么是否可以通过将另一个 MapReduce 任务与之前的任务结合起来,按单词出现的数量对其进行排序?
我对 MapReduce 非常陌生,我完成了一个 Hadoop 字数统计示例。
在该示例中,它生成未排序的字数文件(带有键值对)。那么是否可以通过将另一个 MapReduce 任务与之前的任务结合起来,按单词出现的数量对其进行排序?
在简单的 word count map reduce 程序中,我们得到的输出是按单词排序的。示例输出可以是:
Apple 1
Boy 30
Cat 2
Frog 20
Zebra 1
如果您想根据单词出现的次数对输出进行排序,即以下格式
1 Apple
1 Zebra
2 Cat
20 Frog
30 Boy
您可以创建另一个MR 程序使用下面的 mapper 和 reducer,其中输入将是从简单的字数统计程序获得的输出。
class Map1 extends MapReduceBase implements Mapper<Object, Text, IntWritable, Text>
{
public void map(Object key, Text value, OutputCollector<IntWritable, Text> collector, Reporter arg3) throws IOException
{
String line = value.toString();
StringTokenizer stringTokenizer = new StringTokenizer(line);
{
int number = 999;
String word = "empty";
if(stringTokenizer.hasMoreTokens())
{
String str0= stringTokenizer.nextToken();
word = str0.trim();
}
if(stringTokenizer.hasMoreElements())
{
String str1 = stringTokenizer.nextToken();
number = Integer.parseInt(str1.trim());
}
collector.collect(new IntWritable(number), new Text(word));
}
}
}
class Reduce1 extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text>
{
public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<IntWritable, Text> arg2, Reporter arg3) throws IOException
{
while((values.hasNext()))
{
arg2.collect(key, values.next());
}
}
}
Hadoop MapReduce wordcount 示例的输出按键排序。所以输出应该按字母顺序排列。
使用 Hadoop,您可以创建自己的关键对象来实现WritableComparable
允许您覆盖该compareTo
方法的接口。这允许您控制排序顺序。
要创建按出现次数排序的输出,您可能必须添加另一个 MapReduce 作业来处理您所说的第一个输出。第二项工作将非常简单,甚至可能不需要减少阶段。您只需要实现自己的Writable
密钥对象来包装单词及其频率。自定义可写看起来像这样:
public class MyWritableComparable implements WritableComparable {
// Some data
private int counter;
private long timestamp;
public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}
public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp = in.readLong();
}
public int compareTo(MyWritableComparable w) {
int thisValue = this.value;
int thatValue = ((IntWritable)o).value;
return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
}
我从这里抓住了这个例子。
您可能应该覆盖hashCode
,equals
以及toString
。
在 Hadoop 中,排序是在 Map 和 Reduce 阶段之间完成的。按单词出现排序的一种方法是使用不分组的自定义组比较器。因此,每次调用 reduce 都只是键和一个值。
public class Program {
public static void main( String[] args) {
conf.setOutputKeyClass( IntWritable.class);
conf.setOutputValueClass( Text.clss);
conf.setMapperClass( Map.class);
conf.setReducerClass( IdentityReducer.class);
conf.setOutputValueGroupingComparator( GroupComparator.class);
conf.setNumReduceTasks( 1);
JobClient.runJob( conf);
}
}
public class Map extends MapReduceBase implements Mapper<Text,IntWritable,IntWritable,Text> {
public void map( Text key, IntWritable value, OutputCollector<IntWritable,Text>, Reporter reporter) {
output.collect( value, key);
}
}
public class GroupComaprator extends WritableComparator {
protected GroupComparator() {
super( IntWritable.class, true);
}
public int compare( WritableComparable w1, WritableComparable w2) {
return -1;
}
}
正如您所说,一种可能性是编写两个工作来执行此操作。第一份工作:简单的字数统计示例
第二份工作:做分拣部分。
伪代码可以是:
注意:第一个作业生成的输出文件将作为第二个作业的输入
Mapper2(String _key, Intwritable _value){
//just reverse the position of _value and _key. This is useful because reducer will get the output in the sorted and shuffled manner.
emit(_value,_key);
}
Reduce2(IntWritable valueofMapper2,Iterable<String> keysofMapper2){
//At the reducer side, all the keys that have the same count are merged together.
for each K in keysofMapper2{
emit(K,valueofMapper2); //This will sort in ascending order.
}
}
您还可以按降序排序,这样可以编写一个单独的比较器类来解决问题。在作业中包含比较器:
Job.setComparatorclass(Comparator.class);
此比较器将在发送到减速器端之前按降序对值进行排序。因此,在 reducer 上,您只需发出值。