3

我有以下 Reducer 类

public static class TokenCounterReducer extends Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        JSONObject jsn = new JSONObject();

        for (Text value : values) {
            String[] vals = value.toString().split("\t");
            String[] targetNodes = vals[0].toString().split(",",-1);
            jsn.put("source",vals[1] );
            jsn.put("target",targetNodes);

        }
        // context.write(key, new Text(sum));
    }
}

通过示例(免责声明:此处为新手),我可以看到一般输出类型似乎就像键/值存储。

但是如果我在输出中没有任何键怎么办?或者如果我的输出是其他格式(在我的情况下为 json),我想怎么办?

无论如何,从上面的代码:我想将json对象写入 HDFS?

这在 Hadoop 流中非常简单。但是我如何在 Hadoop java 中做到这一点?

4

2 回答 2

5

如果您只想将 JSON 对象列表写入 HDFS 而不关心键/值的概念,您可以NullWritableReducer输出值中使用 a:

public static class TokenCounterReducer extends Reducer<Text, Text, Text, NullWritable> {
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        for (Text value : values) {
            JSONObject jsn = new JSONObject();
            ....
            context.write(new Text(jsn.toString()), null);
        }
    }
}

请注意,您需要更改作业配置才能执行以下操作:

job.setOutputValueClass(NullWritable.class);

通过将您的 JSON 对象写入 HDFS,我了解到您想要存储我在上面描述的 JSON 的字符串表示形式。如果您想将 JSON 的二进制表示形式存储到 HDFS 中,您需要使用SequenceFile. 显然你可以为此编写自己的Writable,但我觉得如果你打算有一个简单的字符串表示,这样会更容易。

于 2013-06-04T19:19:10.747 回答
5

您可以使用 Hadoop 的 OutputFormat 接口来创建自定义格式,这些格式将根据您的意愿写入数据。例如,如果您需要将数据写入 JSON 对象,那么您可以这样做:

public class JsonOutputFormat extends TextOutputFormat<Text, IntWritable> {
    @Override
    public RecordWriter<Text, IntWritable> getRecordWriter(
            TaskAttemptContext context) throws IOException, 
                  InterruptedException {
        Configuration conf = context.getConfiguration();
        Path path = getOutputPath(context);
        FileSystem fs = path.getFileSystem(conf);
        FSDataOutputStream out = 
                fs.create(new Path(path,context.getJobName()));
        return new JsonRecordWriter(out);
    }

    private static class JsonRecordWriter extends 
          LineRecordWriter<Text,IntWritable>{
        boolean firstRecord = true;
        @Override
        public synchronized void close(TaskAttemptContext context)
                throws IOException {
            out.writeChar('{');
            super.close(null);
        }

        @Override
        public synchronized void write(Text key, IntWritable value)
                throws IOException {
            if (!firstRecord){
                out.writeChars(",\r\n");
                firstRecord = false;
            }
            out.writeChars("\"" + key.toString() + "\":\""+
                    value.toString()+"\"");
        }

        public JsonRecordWriter(DataOutputStream out) 
                throws IOException{
            super(out);
            out.writeChar('}');
        }
    }
}

如果您不想在输出中包含密钥,只需发出 null,例如:

context.write(NullWritable.get(), new IntWritable(sum));

高温高压

于 2013-06-04T19:17:58.907 回答