我有这样的记录输入,a|1|Y, b|0|N, c|1|N, d|2|Y, e|1|Y
现在,在映射器中,我必须检查第三列的值。如果是“Y”,那么该记录必须直接写入输出文件,而不将该记录移动到减速器,否则,“N”值记录必须移动到减速器进行进一步处理。
所以,a|1|Y, d|2|Y, e|1|Y 不应该去reducer但是b|0|N, c|1|N应该去reducer然后输出文件。
我怎样才能做到这一点??
您可能会做的是使用MultipleOutputs - 单击此处将“Y”和“N”类型的记录从映射器中分离到两个不同的文件中。
接下来,您为两个新生成的“Y”和“N”类型数据集运行单独的作业。对于“Y”类型,将减速器的数量设置为 0,因此不使用减速器。而且,对于“N”类型,使用减速器按照您想要的方式进行操作。
希望这可以帮助。
看看这是否有效,
public class Xxxx {
public static class MyMapper extends
Mapper<LongWritable, Text, LongWritable, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
Random r = new Random();
FileSplit split = (FileSplit)context.getInputSplit();
String fileName = split.getPath().getName();
FSDataOutputStream out = fs.create(new Path(fileName + "-m-" + r.nextInt()));
String parts[];
String line = value.toString();
String[] splits = line.split(",");
for(String s : splits) {
parts = s.split("\\|");
if(parts[2].equals("Y")) {
out.writeBytes(line);
}else {
context.write(key, value);
}
}
out.close();
fs.close();
}
}
public static class MyReducer extends
Reducer<LongWritable, Text, LongWritable, Text> {
public void reduce(LongWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
for(Text t : values) {
context.write(key, t);
}
}
}
/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
conf.set("mapred.job.tracker", "localhost:9001");
Job job = new Job(conf, "Xxxx");
job.setJarByClass(Xxxx.class);
Path outPath = new Path("/output_path");
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
FileInputFormat.addInputPath(job, new Path("/input.txt"));
FileOutputFormat.setOutputPath(job, outPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
在您的地图功能中,您将逐行获取输入。使用 | 拆分它 作为分隔符。(String.split()
确切地说是使用方法)它看起来像这样
String[] line = value.toString().split('|');
通过以下方式访问此数组的第三个元素line[2]
然后,使用一个简单的if else
语句,发出具有 N 值的输出以供进一步处理。