我只是想从映射器访问分布式缓存文件,并尝试将缓存文件中的记录(字符串)设置为键,只是为了检查我是否从缓存文件(stop.txt)中获取内容,但是什么我得到的是实际文件的文件内容,即输入文件(input.txt)内容作为键。请指导缓存文件和输入文件都在hdfs中
下面是我的实际代码
package com.cache;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityReducer;
public class DistributedCacheTest {
public static class MyMap extends MapReduceBase implements Mapper<LongWritable,Text , Text, IntWritable>{
public Path[] localArchives;
public Path[] localFiles;
BufferedReader cacheReader;
public void configure(JobConf job){
// Get the cached archives/files
try {
localArchives = DistributedCache.getLocalCacheArchives(job);
localFiles = DistributedCache.getLocalCacheFiles(job);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void map(LongWritable key,Text value ,
OutputCollector<Text, IntWritable> output, Reporter report)
throws IOException {
if (localFiles != null && localFiles.length > 0) {
System.out.println("Inside setup(): "
+ localFiles[0].toString());
String line;
try{
cacheReader = new BufferedReader(new FileReader(localFiles[0].toString()));
while((line=cacheReader.readLine())!=null)
{
System.out.println("**********" + line);
output.collect(new Text(line), new IntWritable(1));
}
}
catch(Exception ex)
{
ex.printStackTrace();
}
finally
{
cacheReader.close();
}
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
JobConf job =new JobConf(DistributedCacheTest.class);
job.setJobName("DistriTestjob");
DistributedCache.addCacheFile(new URI("/user/hadoop/stop.txt"),job);
job.setMapperClass(MyMap.class);
job.setReducerClass(IdentityReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
JobClient.runJob(job);
}
}