I am trying to do iteration with map reduce.
I have 3 sequence
job running
static enum UpdateCounter {
INCOMING_ATTR
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int res = ToolRunner.run(conf, new Driver(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
while(counter >= 0){
Configuration conf = getConf();
/*
* Job 1:
*/
Job job1 = new Job(conf, "");
//other configuration
job1.setMapperClass(ID3ClsLabelMapper.class);
job1.setReducerClass(ID3ClsLabelReducer.class);
Path in = new Path(args[0]);
Path out1 = new Path(CL);
if(counter == 0){
FileInputFormat.addInputPath(job1, in);
}
else{
FileInputFormat.addInputPath(job1, out5);
}
FileInputFormat.addInputPath(job1, in);
FileOutputFormat.setOutputPath(job1,out1);
job1.waitForCompletion(true);
/*
* Job 2:
*
*/
Configuration conf2 = getConf();
Job job2 = new Job(conf2, "");
Path out2 = new Path(ANC);
FileInputFormat.addInputPath(job2, in);
FileOutputFormat.setOutputPath(job2,out2);
job2.waitForCompletion(true);
/*
* Job3
*/
Configuration conf3 = getConf();
Job job3 = new Job(conf3, "");
System.out.println("conf3");
Path out5 = new Path(args[1]);
if(fs.exists(out5)){
fs.delete(out5, true);
}
FileInputFormat.addInputPath(job3,out2);
FileOutputFormat.setOutputPath(job3,out5);
job3.waitForCompletion(true);
FileInputFormat.addInputPath(job3,new Path(args[0]));
FileOutputFormat.setOutputPath(job3,out5);
job3.waitForCompletion(true);
counter = job3.getCounters().findCounter(UpdateCounter.INCOMING_ATTR).getValue();
}
return 0;
Job 3 reducer
public class ID3GSReducer extends Reducer<NullWritable, Text, NullWritable, Text>{
public static final String UpdateCounter = null;
NullWritable out = NullWritable.get();
public void reduce(NullWritable key,Iterable<Text> values ,Context context) throws IOException, InterruptedException{
for(Text val : values){
String v = val.toString();
context.getCounter(UpdateCounter.INCOMING_ATTR).increment(1);
context.write(out, new Text(v));
}
}
}
But showing
14/06/12 10:12:30 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
14/06/12 10:12:30 INFO mapred.JobClient: Total committed heap usage (bytes)=1238630400
conf3
Exception in thread "main" java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:116)
at org.apache.hadoop.mapreduce.Job.getCounters(Job.java:491)
Now how to iterate through the above jobs?
The whole 3 jobs should work until INCOMING_ATTR == 0
and the output of job3 - args[1]
should be the input to job 1
for second iteration
.In order to do this should I change anything.
Please suggest.
Am I doing anything wrong.