我正在做一个使用 map reduce 将分隔文件转换为序列化 json 记录文件的演示。我正在使用杰克逊,但是当我运行我的工作时,地图部分在发出几个看似杰克逊相关的错误后失败:
$ hadoop jar target/map-demo.jar input output
2013-09-16 15:27:25.046 java[7250:1703] Unable to load realm info from SCDynamicStore
13/09/16 15:27:25 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/09/16 15:27:25 INFO input.FileInputFormat: Total input paths to process : 1
13/09/16 15:27:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/09/16 15:27:25 WARN snappy.LoadSnappy: Snappy native library not loaded
13/09/16 15:27:25 INFO mapred.JobClient: Running job: job_201309161312_0011
13/09/16 15:27:26 INFO mapred.JobClient: map 0% reduce 0%
13/09/16 15:27:30 INFO mapred.JobClient: Task Id : attempt_201309161312_0011_m_000000_0, Status : FAILED
Error: org.codehaus.jackson.map.ObjectMapper.setVisibility(Lorg/codehaus/jackson/annotate/JsonMethod;Lorg/codehaus/jackson/annotate/JsonAutoDetect$Visibility;)Lorg/codehaus/jackson/map/ObjectMapper;
attempt_201309161312_0011_m_000000_0: 2013-09-16 15:27:27.856 java[7286:1703] Unable to load realm info from SCDynamicStore
13/09/16 15:27:32 INFO mapred.JobClient: Task Id : attempt_201309161312_0011_m_000000_1, Status : FAILED
Error: org.codehaus.jackson.map.ObjectMapper.setVisibility(Lorg/codehaus/jackson/annotate/JsonMethod;Lorg/codehaus/jackson/annotate/JsonAutoDetect$Visibility;)Lorg/codehaus/jackson/map/ObjectMapper;
attempt_201309161312_0011_m_000000_1: 2013-09-16 15:27:30.566 java[7304:1703] Unable to load realm info from SCDynamicStore
13/09/16 15:27:35 INFO mapred.JobClient: Task Id : attempt_201309161312_0011_m_000000_2, Status : FAILED
Error: org.codehaus.jackson.map.ObjectMapper.setVisibility(Lorg/codehaus/jackson/annotate/JsonMethod;Lorg/codehaus/jackson/annotate/JsonAutoDetect$Visibility;)Lorg/codehaus/jackson/map/ObjectMapper;
attempt_201309161312_0011_m_000000_2: 2013-09-16 15:27:33.298 java[7334:1703] Unable to load realm info from SCDynamicStore
13/09/16 15:27:39 INFO mapred.JobClient: Job complete: job_201309161312_0011
13/09/16 15:27:40 INFO mapred.JobClient: Counters: 7
13/09/16 15:27:40 INFO mapred.JobClient: Job Counters
13/09/16 15:27:40 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=6476
13/09/16 15:27:40 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/09/16 15:27:40 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/09/16 15:27:40 INFO mapred.JobClient: Launched map tasks=4
13/09/16 15:27:40 INFO mapred.JobClient: Data-local map tasks=4
13/09/16 15:27:40 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
13/09/16 15:27:40 INFO mapred.JobClient: Failed map tasks=1
我有一个单元测试,它与 map reduce 作业完全相同,但在本地文件系统中是单线程的。这很好用:
这是我的工作设置
import java.io.IOException;
import com.example.text.Parser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MapDemo {
public static class Map extends Mapper<Object, Text, Text, NullWritable> {
private Text text = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
Record record = Parser.toJson(line);
text.set(json);
context.write(text, NullWritable.get());
}
}
public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable> {
public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration, "MapDemo");
job.setJarByClass(MapDemo.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我的 toJson 方法如下:
public String toJson() {
mapper.setVisibility(JsonMethod.FIELD, Visibility.ANY);
try {
return mapper.writeValueAsString(this);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
我不确定要查看哪些日志文件等。这里有什么明显的我做错了吗?接下来我该怎么办?