我在没有使用工具的情况下为单机集群编写了 mapreduce java 代码,它会在多节点集群上工作还是我必须进行更改?以下代码标记字符串并计算每个文本文件的词频
public class tr
{
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,Text,IntWritable>
{
Text word=new Text();
IntWritable one=new IntWritable(1);
String imptoken;
public static List<String> stopwords=new ArrayList<String>();
public void map(LongWritable key,Text value,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException
{
addwords();
String line=value.toString();
line=line.replaceAll("[^A-Za-z]"," ").toLowerCase();
StringTokenizer st=new StringTokenizer(line);
while(st.hasMoreTokens())
{
imptoken=st.nextToken();
if(stopwords.contains(imptoken))
{
}
else
{
word.set(imptoken);
output.collect(word, one);
}
}
}
public void addwords() throws IOException
{
FileSystem fs = FileSystem.get(new Configuration());
Path stop=new Path("/user/hduser/stopword.txt");
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(stop)));
String stopword=br.readLine();
while(stopword!=null)
{
stopwords.add(stopword);
stopword=br.readLine();
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text,IntWritable, Text, IntWritable>
{
public void reduce(Text key,Iterator<IntWritable> value,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException
{
int sum=0;
while(value.hasNext())
{
sum=sum+value.next().get();
}
/* Path paths=new Path("/user/hduser/input1/");
FileSystem fs=FileSystem.get(new Configuration());
FileStatus[] status = fs.listStatus(paths);
Path[] list = FileUtil.stat2Paths(status);
String keystr=key.toString();
for(Path file : list)
{
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(file)));
String word=br.readLine();
while(word!=null)
{
if(word.equals(keystr))
{
sum=0;
}
word=br.readLine();
}
}*/
output.collect(key, new IntWritable(sum));
}
}
public static void main(String args[]) throws IOException
{
FileSystem fs = FileSystem.get(new Configuration());
Path[] paths = new Path[args.length];
for (int i = 0; i < paths.length; i++)
{
paths[i] = new Path(args[i]);
}
FileStatus[] status = fs.listStatus(paths);
Path[] listedPaths = FileUtil.stat2Paths(status);
FSDataInputStream in = null;
for (Path p : listedPaths)
{
JobConf conf = new JobConf(tr.class);
conf.setJobName("tr");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
String name=p.getName();
String absolutepath=p.getParent().toString()+"/"+name;
FileInputFormat.setInputPaths(conf, new Path(absolutepath));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
Path local=new Path("/home/hduser/meproj/projectfiles/");
Path source=new Path(args[1]+"/"+"part-00000");
fs.copyToLocalFile(source, local);
File file=new File("/home/hduser/meproj/projectfiles/part-00000");
file.renameTo(new File("/home/hduser/meproj/projectfiles/"+name));
fs.delete(new Path(args[1]), true);
}
}
}