i'm trying to implement some Map/Reduce job with hadoop for a college assignment. but at the moment i'm totally stuck while implementing a custom FileInputFormat class to get the whole content from a file into my mapper.
i took the example out of "hadoop: the definitive guide" without no changes. i can compile my source code but if i run it it will throw this exception (at the moment i use hadoop 1.0.2 on a debian 5.0)
Exception in thread "main" java.lang.RuntimeException: java.lang.NoSuchMethodException: org.myorg.ExampleFileInputFormat$WholeFileInputFormat.<init>()
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.mapred.JobConf.getInputFormat(JobConf.java:575)
at org.apache.hadoop.mapred.JobClient.writeOldSplits(JobClient.java:989)
at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:981)
at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:897)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:824)
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261)
at org.myorg.ExampleFileInputFormat.run(ExampleFileInputFormat.java:163)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.myorg.ExampleFileInputFormat.main(ExampleFileInputFormat.java:172)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: java.lang.NoSuchMethodException: org.myorg.ExampleFileInputFormat$WholeFileInputFormat.<init>()
at java.lang.Class.getConstructor0(Class.java:2706)
at java.lang.Class.getDeclaredConstructor(Class.java:1985)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:109)
... 21 more
i'm kind of frustrated because i don't understand what happens and i don't find anything using web search. Maybe some of you could take a look on my source. it's kind of stripped down at the moment for the purpose of debugging.
package org.myorg;
/*
*
*
*/
import java.io.IOException;
import java.util.*;
import java.io.*;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configured;
public class ExampleFileInputFormat extends Configured implements Tool {
/*
* <generics>
*/
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
return new WholeFileRecordReader((FileSplit) split, job);
}
}
public class WholeFileRecordReader implements RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private boolean processed = false;
public WholeFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException {
this.fileSplit = fileSplit;
this.conf = conf;
}
@Override
public NullWritable createKey() {
return NullWritable.get();
}
@Override
public BytesWritable createValue() {
return new BytesWritable();
}
@Override
public long getPos() throws IOException {
return processed ? fileSplit.getLength() : 0;
}
@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
@Override
public boolean next(NullWritable key, BytesWritable value) throws IOException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public void close() throws IOException {
// do nothing
}
}
/* </generics> */
/*
* <Task1>:
* */
public static class ExampleMap extends MapReduceBase implements Mapper<NullWritable, BytesWritable, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(NullWritable key, BytesWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
output.collect(new Text("test"), one);
}
}
public static class ExampleReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
/* </Task1> */
/*
* <run>
**/
public int run(String[] args) throws Exception {
if (args.length != 3) {
printUsage();
return 1;
}
String useCase = args[0];
String inputPath = args[1];
String outputPath = args[2];
deleteOldOutput(outputPath);
JobConf conf = new JobConf(ExampleFileInputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
FileInputFormat.setInputPaths(conf, new Path(inputPath));
/* conf: Task1 */
if (useCase.equals("cc_p")) {
conf.setJobName("WordCount");
/* Output: Key:Text -> Value:Integer */
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setOutputFormat(TextOutputFormat.class);
/* Input: Key.Text -> Value:Text */
conf.setInputFormat(WholeFileInputFormat.class);
conf.setMapperClass(ExampleMap.class);
conf.setReducerClass(ExampleReduce.class);
}
/* default-option: Exit */
else {
printUsage();
return 1;
}
JobClient.runJob(conf);
return 0;
}
/* </run> */
/*
* <Main>
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new ExampleFileInputFormat(), args);
System.exit(res);
}
/* </Main> */
/*
* <Helper>
*/
private void printUsage() {
System.out.println("usage: [usecase] [input-path] [output-path]");
return;
}
private void deleteOldOutput(String outputPath) throws IOException {
// Delete the output directory if it exists already
Path outputDir = new Path(outputPath);
FileSystem.get(getConf()).delete(outputDir, true);
}
/* </Helper-> */
}
Can anyone help me out?
Greetings from Germany, Alex