0

我只是想从映射器访问分布式缓存文件,并尝试将缓存文件中的记录(字符串)设置为键,只是为了检查我是否从缓存文件(stop.txt)中获取内容,但是什么我得到的是实际文件的文件内容,即输入文件(input.txt)内容作为键。请指导缓存文件和输入文件都在hdfs中

下面是我的实际代码

package com.cache;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityReducer;

public class DistributedCacheTest  {

    public static class MyMap extends MapReduceBase implements Mapper<LongWritable,Text , Text, IntWritable>{

        public Path[] localArchives;
        public Path[] localFiles;
        BufferedReader cacheReader;

        public void configure(JobConf job){
             // Get the cached archives/files
             try {
                localArchives = DistributedCache.getLocalCacheArchives(job);
                 localFiles = DistributedCache.getLocalCacheFiles(job);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
              }


    public void map(LongWritable  key,Text value ,
            OutputCollector<Text, IntWritable> output, Reporter report)
            throws IOException {

        if (localFiles != null && localFiles.length > 0) {
             System.out.println("Inside setup(): "
             + localFiles[0].toString());

             String line;
             try{
             cacheReader = new BufferedReader(new FileReader(localFiles[0].toString()));
             while((line=cacheReader.readLine())!=null)
             {
                 System.out.println("**********" + line);
                 output.collect(new Text(line), new IntWritable(1));
             }
             }
             catch(Exception ex)
             {
                 ex.printStackTrace();
             }
             finally
             {
                 cacheReader.close();
             }



    }
        }

    }
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
         if (args.length != 2) {
            System.err.println("Usage: MaxTemperature <input path> <output path>");
            System.exit(-1);
            }
         JobConf job =new JobConf(DistributedCacheTest.class);
         job.setJobName("DistriTestjob");
         DistributedCache.addCacheFile(new URI("/user/hadoop/stop.txt"),job);

         job.setMapperClass(MyMap.class);
         job.setReducerClass(IdentityReducer.class);

         job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            JobClient.runJob(job);


    }

}
4

0 回答 0