我正在尝试编写一个 Hadoop map/reduce 类,该类读取包含演员列表和他们播放过的电影(每行一部电影)的文本文件,并返回每个演员参与的电影数量。
最后,我希望结果按电影数量排序(升序或降序都可以)。但是,我的代码似乎按电影标题中的字符数对结果进行了排序。我已经尝试了我能想到的一切,包括反转输出(文本、IntWritable 到 IntWritable、文本)并使用不同的比较器,但我无法让它按电影计数对结果进行排序。
我敢肯定这是非常简单的事情,但我一生都无法弄清楚。任何建议将不胜感激。
数据文件的摘录:
Chan, Jackie (I) The Forbidden Kingdom 2008
Chan, Jackie (I) Kung Fu Panda 2 2011
Chan, Jackie (I) Shanghai Noon 2000
Chan, Jackie (I) Pik lik for 1995
Chan, Jackie (I) The Karate Kid 2010
Chan, Jackie (I) Shanghai Knights 2003
Chan, Jackie (I) Around the World in 80 Days 2004
Chan, Jackie (I) Rush Hour 1998
Chan, Jackie (I) The Tuxedo 2002
Chan, Jackie (I) Kung Fu Panda 2008
Chan, Jackie (I) Rush Hour 2 2001
Chan, Jackie (I) Rush Hour 3 2007
Davi, Robert Licence to Kill 1989
Davi, Robert Die Hard 1988
Davi, Robert The Hot Chick 2002
Davi, Robert The Goonies 1985
我的代码如下:
// MovieCountByActor.java
package ucsc.hadoop.homework2;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
// import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import ucsc.hadoop.util.ConfigurationUtil;
public class MovieCountByActor extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(MovieCountByActor.class);
public int run(String[] args) throws Exception {
// Configuration conf = getConf();
JobConf conf = new JobConf(getConf(), MovieCountByActor.class);
conf.setOutputKeyComparatorClass(CountSort.class);
conf.setOutputValueGroupingComparator(CountSort.class);
if (args.length != 2) {
System.err.println("Usage: moviecountbyactor <in> <out>");
System.exit(2);
}
ConfigurationUtil.dumpConfigurations(conf, System.out);
LOG.info("input: " + args[0] + " output: " + args[1]);
Job job = new Job(conf, "movie count");
job.setJarByClass(MovieCountByActor.class);
job.setMapperClass(MovieTokenizerMapper.class);
job.setReducerClass(MovieCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setSortComparatorClass(CountSort.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
return (result) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MovieCountByActor(), args);
System.exit(exitCode);
}
public static class MovieTokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private final static Text ACTOR = new Text();
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\t");
String actor = "";
if (tokens.length == 3) {
actor = tokens[0];
ACTOR.set(actor);
context.write(ACTOR, ONE);
}
}
}
public static class MovieCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text actor, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int movieCountPerActor = 0;
for (IntWritable count : values) {
movieCountPerActor += count.get();
}
result.set(movieCountPerActor);
context.write(actor, result);
}
}
public static class CountSort extends WritableComparator {
protected CountSort() {
super (IntWritable.class);
}
@Override
public int compare(byte[] b1, int j1, int k1, byte[] b2, int j2, int k2) {
Integer a = ByteBuffer.wrap(b1, j1, k1).getInt();
Integer b = ByteBuffer.wrap(b2, j2, k2).getInt();
return a.compareTo(b) * -1;
}
}
}