2

我有这样的记录输入,a|1|Y, b|0|N, c|1|N, d|2|Y, e|1|Y

现在,在映射器中,我必须检查第三列的值。如果是“Y”,那么该记录必须直接写入输出文件,而不将该记录移动到减速器,否则,“N”值记录必须移动到减速器进行进一步处理。

所以,a|1|Y, d|2|Y, e|1|Y 不应该去reducer但是b|0|N, c|1|N应该去reducer然后输出文件。

我怎样才能做到这一点??

4

3 回答 3

2

您可能会做的是使用MultipleOutputs - 单击此处将“Y”和“N”类型的记录从映射器中分离到两个不同的文件中。

接下来,您为两个新生成的“Y”和“N”类型数据集运行单独的作业。对于“Y”类型,将减速器的数量设置为 0,因此不使用减速器。而且,对于“N”类型,使用减速器按照您想要的方式进行操作。

希望这可以帮助。

于 2013-06-20T13:18:47.483 回答
1

看看这是否有效,

public class Xxxx {

    public static class MyMapper extends
            Mapper<LongWritable, Text, LongWritable, Text> {        

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {               

            FileSystem fs = FileSystem.get(context.getConfiguration());
            Random r = new Random();                
            FileSplit split = (FileSplit)context.getInputSplit();
            String fileName = split.getPath().getName();                
            FSDataOutputStream out = fs.create(new Path(fileName + "-m-" + r.nextInt()));                               
            String parts[];
            String line = value.toString();
            String[] splits = line.split(",");
            for(String s : splits) {
                parts = s.split("\\|");
                if(parts[2].equals("Y")) {                  
                    out.writeBytes(line);
                }else {
                    context.write(key, value);
                }
            }
            out.close();
            fs.close();
        }       
    }

    public static class MyReducer extends
            Reducer<LongWritable, Text, LongWritable, Text> {
        public void reduce(LongWritable key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
            for(Text t : values) {
            context.write(key, t);
            }
        }
    }

    /**
     * @param args
     * @throws IOException 
     * @throws InterruptedException 
     * @throws ClassNotFoundException 
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub

        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://localhost:9000");
        conf.set("mapred.job.tracker", "localhost:9001");
        Job job = new Job(conf, "Xxxx");
        job.setJarByClass(Xxxx.class);
        Path outPath = new Path("/output_path");
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        FileInputFormat.addInputPath(job, new Path("/input.txt"));
        FileOutputFormat.setOutputPath(job, outPath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
于 2013-06-21T01:02:14.600 回答
-1

在您的地图功能中,您将逐行获取输入。使用 | 拆分它 作为分隔符。(String.split()确切地说是使用方法)它看起来像这样

String[] line = value.toString().split('|');

通过以下方式访问此数组的第三个元素line[2]

然后,使用一个简单的if else语句,发出具有 N 值的输出以供进一步处理。

于 2013-06-20T22:08:22.563 回答