我在hadoop中有以下代码,其中mapper和reducer如下:
public static class Map2 extends Mapper<LongWritable, Text, NullWritable, Text>
{
TreeMap<Text, Text> top10 = new TreeMap<Text, Text>();
HashMap<String, String> userInfo = new HashMap<String, String>();
public void setup(Context context) throws IOException, InterruptedException
{
try
{
URI[] uris = DistributedCache.getCacheFiles(context.getConfiguration());
FileSystem fs = FileSystem.get(context.getConfiguration());
if (uris == null || uris.length == 0)
{
throw new IOException("Error reading file from distributed cache. No URIs found.");
}
String path = "./users.dat";
fs.copyToLocalFile(new Path(uris[0]), new Path(path));
BufferedReader br = new BufferedReader(new FileReader(path));
String line = null;
while((line = br.readLine()) != null)
{
String split[] = line.split("\\::");
String age = split[2];
String gender = split[1];
userInfo.put(split[0], gender + "\t" + age);
}
br.close();
}
catch(Exception e)
{
}
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
try
{
String line = value.toString();
int sum = Integer.parseInt(line.split("\\t")[1]);
String userID = line.split("\\t")[0];
String newKey = sum + " " + userID;
if(userInfo.containsKey(userID))
{
String record = userInfo.get(userID);
String val = userID + "\t" + record + "\t" + sum;
top10.put(new Text(newKey), new Text(val));
if (top10.size() > 10)
{
top10.remove(top10.firstKey());
}
}
}
catch(Exception e)
{
}
}
protected void cleanup(Context context) throws IOException, InterruptedException
{
try
{
for (Text s1 : top10.descendingMap().values())
{
context.write(NullWritable.get(), s1);
}
}
catch(Exception e)
{
}
}
}
public static class Reduce2 extends Reducer<NullWritable, Text, NullWritable, Text>
{
private TreeMap<Text, Text> top10 = new TreeMap<Text, Text>();
public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
try
{
String line = values.toString();
String sum = line.split("\\t")[3];
String userID = line.split("\\t")[0];
String gender = line.split("\\t")[1];
String age = line.split("\\t")[2];
String newKey = sum + " " + userID;
String val = userID + "\t" + gender + "\t" + age + "\t" + sum;
top10.put(new Text(newKey), new Text(val));
if(top10.size() > 10)
{
top10.remove(top10.firstKey());
}
}
catch(Exception e)
{
}
}
protected void cleanup(Context context) throws IOException, InterruptedException
{
try
{
for (Text s1 : top10.descendingMap().values())
{
context.write(NullWritable.get(), s1);
}
}
catch(Exception e)
{
}
}
}
驱动方法如下:
Configuration conf2 = new Configuration();
DistributedCache.addCacheFile(new Path("/Spring2014_HW-1/input_HW-1/users.dat").toUri(), conf2);
Job job2 = new Job(conf2, "Phase2");
job2.setOutputKeyClass(NullWritable.class);
job2.setOutputValueClass(Text.class);
job2.setJarByClass(MapSideJoin.class);
job2.setMapperClass(Map2.class);
job2.setReducerClass(Reduce2.class);
job2.setInputFormatClass(TextInputFormat.class);
job2.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job2, new Path(args[1]));
FileOutputFormat.setOutputPath(job2, new Path(args[2]));
//job2.setNumReduceTasks(1);
job2.waitForCompletion(true);
即使我已经从 reducer 发出输出,我也会收到消息为 map output records = 10 和 reduce output records = 0?减速器的输出在哪里消失了?
谢谢。