I have a load of JobControls running at the same time, all with the same set of ControlledJobs. Each JobControl is dealing with a different set of input / output files, by date range, but they are all of the type. The problem that I am observing is that the reduce steps are receiving data designed to be processed by a reducer handling a different date range. The date range is set by the Job, used to determine the input and output, and read from the context within the reducer.
This stops if I submit the JobControls sequentially but that's no good. Is this something I should be solving with a custom partitioner? How would I even determine the correct reducer for a key if I don't know which reducer is dealing with my current date-range? Why would the instantiated reducers not be locked to their JobControl?
I have writing all the JobControls, Jobs, Maps and Reduces against their base implementations in Java.
I'm using the 2.0.3-alpha with yarn. Could that have anything to do with it?
I have to be a little careful sharing the code, but here's a sanitised mapper:
protected void map(LongWritable key, ProtobufWritable<Model> value, Context context)
throws IOException, InterruptedException {
context.write(new Text(value.get().getSessionId()),
new ProtobufModelWritable(value.get()));
}
And Reducer:
protected void reduce(Text sessionId, Iterable<ProtobufModelWritable> models, Context context)
throws IOException, InterruptedException {
Interval interval = getIntervalFromConfig(context);
Model2 model2 = collapseModels(Iterables.transform(models, TO_MODEL));
Preconditions.checkArgument(interval.contains(model2.getTimeStamp()),
"model2: " + model2 + " does not belong in " + interval);
}
private Interval getIntervalFromConfig(Context context) {
String i = context.getConfiguration().get(INTERVAL_KEY);
return Utils.interval(i);
}