我已经实现了一种自定义输出格式,用于将键值对转换为 Json 格式。
public class JSONOutputFormat extends TextOutputFormat<Text, IntWritable> {
@Override
public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Path path = getOutputPath(context);
FileSystem fs = path.getFileSystem(conf);
FSDataOutputStream out = fs.create(new Path(path,context.getJobName()));
return new JsonRecordWriter(out);
}
}
private static class JsonRecordWriter extends LineRecordWriter<Text,IntWritable>{
boolean firstRecord = true;
@Override
public synchronized void close(TaskAttemptContext context) throws IOException {
out.writeBytes("}");
super.close(context);
}
@Override
public synchronized void write(Text key, IntWritable value)
throws IOException {
if (!firstRecord){
out.writeBytes(",\r\n");
firstRecord = false;
}
out.writeUTF(key.toString() + ":" +value.toString());
}
public JsonRecordWriter(DataOutputStream out) throws IOException{
super(out);
out.writeBytes("{");
}
}
但是,Mapreduce 作业的输出有一些不受欢迎的字符,例如:{ NUL Chair:12 NUL BS Book:1}
我的驱动程序类如下:
public class Driver {
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
IntWritable one = new IntWritable(1);
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for(String word: words)
context.write(new Text(word), one);
}
}
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Iterator it = values.iterator();
int count = 0;
while (it.hasNext()){
IntWritable c = (IntWritable) it.next();
count+=c.get();
}
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "wordcountjson");
job.setJarByClass(Driver.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputFormatClass(JSONOutputFormat.class);
job.setNumReduceTasks(1);
System.exit(job.waitForCompletion(true)?0:1);
}
}
任何想法为什么这些字符出现在输出中?