-2

我在没有使用工具的情况下为单机集群编写了 mapreduce java 代码,它会在多节点集群上工作还是我必须进行更改?以下代码标记字符串并计算每个文本文件的词频

public class tr 
    {
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,Text,IntWritable>
    {
       Text word=new Text();
       IntWritable one=new IntWritable(1);
               String imptoken;
       public static  List<String> stopwords=new ArrayList<String>();
       public void map(LongWritable key,Text value,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException
       {
                       addwords();
              String line=value.toString();
                      line=line.replaceAll("[^A-Za-z]"," ").toLowerCase();
                      StringTokenizer st=new StringTokenizer(line);
          while(st.hasMoreTokens())
        {
                 imptoken=st.nextToken();
                             if(stopwords.contains(imptoken))
               {

               }               
               else
               {
                   word.set(imptoken);
                   output.collect(word, one); 
               }                              
                   }
    }
          public void addwords() throws IOException
      {
     FileSystem fs = FileSystem.get(new Configuration());
     Path stop=new Path("/user/hduser/stopword.txt");
     BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(stop)));
     String stopword=br.readLine();
     while(stopword!=null)
     {
         stopwords.add(stopword);
         stopword=br.readLine();
     }

      }

}
public static class Reduce extends MapReduceBase implements Reducer<Text,IntWritable, Text, IntWritable>
{
    public void reduce(Text key,Iterator<IntWritable> value,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException
    {
        int sum=0;
        while(value.hasNext())
        {
            sum=sum+value.next().get();
        }   
                   /* Path paths=new Path("/user/hduser/input1/");
        FileSystem fs=FileSystem.get(new Configuration());
        FileStatus[] status = fs.listStatus(paths);
                    Path[] list = FileUtil.stat2Paths(status);
                    String keystr=key.toString();
                    for(Path file : list)
                    { 
                       BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(file)));
               String word=br.readLine();
               while(word!=null)
               {
                 if(word.equals(keystr))
                 {
                     sum=0;
                 }
                 word=br.readLine();
                }

                     }*/

                     output.collect(key, new IntWritable(sum));
            }       
    }

public static void main(String args[]) throws IOException
{             
    FileSystem fs = FileSystem.get(new Configuration());
    Path[] paths = new Path[args.length];
    for (int i = 0; i < paths.length; i++) 
    {
        paths[i] = new Path(args[i]);
    }

    FileStatus[] status = fs.listStatus(paths);
            Path[] listedPaths = FileUtil.stat2Paths(status);

    FSDataInputStream in = null;
    for (Path p : listedPaths) 
    {
         JobConf conf = new JobConf(tr.class);
             conf.setJobName("tr");

             conf.setOutputKeyClass(Text.class);
             conf.setOutputValueClass(IntWritable.class);

             conf.setMapperClass(Map.class);
             conf.setCombinerClass(Reduce.class);
             conf.setReducerClass(Reduce.class);

             conf.setInputFormat(TextInputFormat.class);
                     conf.setOutputFormat(TextOutputFormat.class);

             String name=p.getName();
             String absolutepath=p.getParent().toString()+"/"+name;

             FileInputFormat.setInputPaths(conf, new Path(absolutepath));
                     FileOutputFormat.setOutputPath(conf, new Path(args[1]));

                     JobClient.runJob(conf); 

                     Path local=new Path("/home/hduser/meproj/projectfiles/");
                     Path source=new Path(args[1]+"/"+"part-00000");

                     fs.copyToLocalFile(source, local);

                     File  file=new File("/home/hduser/meproj/projectfiles/part-00000");
                     file.renameTo(new File("/home/hduser/meproj/projectfiles/"+name));
                     fs.delete(new Path(args[1]), true);
    }
}


}
4

1 回答 1

1

当您使用它为 Hadoop 编写程序时,它将适用于所有集群设置,除非您专门做一些事情来破坏它,例如在一台机器上处理本地文件。

您正在以独立于设置的方式(您应该这样做)在 Mapper 和 Reducer 中进行工作,因此它应该可以在任何地方工作。

这与您的问题无关,但您不应该遍历文件并在每个路径上运行独立的作业。真的,您应该在所有这些上运行一项工作。您可以将所有这些单独的路径放在同一个文件夹中,并将该文件夹指定为输入。或者您可以在多个路径上运行 hadoop(请参阅此答案

于 2013-04-30T23:25:26.450 回答