我正在尝试创建简单的 map-reduce 示例。这是我的代码
public class Main {
public static void main(String... args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = new Job();
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setNumReduceTasks(5);
job.setJarByClass(Main.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileOutputFormat.setCompressOutput(job, false);
job.waitForCompletion(true);
System.out.println("Done");
}
}
这是我的映射器
public class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, new LongWritable(1));
}
}
和 MyReducer
public class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
private MultipleOutputs<Text, LongWritable> mos;
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
Iterator<LongWritable> iterator = values.iterator();
while (iterator.hasNext()){
sum += iterator.next().get();
}
mos.write(key, new LongWritable(sum), "Tabyretka");
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
mos = new MultipleOutputs<Text, LongWritable>(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
当我在本地运行这个程序时,它工作正常!但是当我将此程序加载到另一台 hadoop 机器时,它给了我下一个错误:
java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzopCodec
not found.
at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:116)
at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:156)
at org.apache.hadoop.mapreduce.lib.input.TextInputFormat.isSplitable(TextInputFormat.java:51)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:254)
at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:950)
at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:967)
at org.apache.hadoop.mapred.JobClient.access$500(JobClient.java:170)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:880)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:833)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1177)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:833)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:476)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:506)
at org.apache.hadoop.examples.WordCount.main(WordCount.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:68)
at org.apache.hadoop.util.ProgramDriver.driver(ProgramDriver.java:139)
at org.apache.hadoop.examples.ExampleDriver.main(ExampleDriver.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:197)
Caused by: java.lang.ClassNotFoundException: com.hadoop.compression.lzo.LzopCodec
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:249)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:951)
at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:109)
... 27 more
但我不在任何地方使用 lzopCodec。我尝试通过创建配置来禁用此编解码器,设置它的属性
conf.set("mapred.compress.map.output","false");
conf.set("mapred.output.compress","false");
并将此配置传递给作业。但它仍然失败。
有没有人有建议,为什么它尝试使用 lzopCodec 以及如何禁用它?