0

We have a scenario of generating unique key for every single row in a file. we have a timestamp column but the are multiple rows available for a same timestamp in few scenarios.

We decided unique values to be timestamp appended with their respective count as mentioned in the below program.

Mapper will just emit the timestamp as key and the entire row as its value, and in reducer the key is generated.

Problem is Map outputs about 236 rows, of which only 230 records are fed as an input for reducer which outputs the same 230 records.

public class UniqueKeyGenerator extends Configured implements Tool {

    private static final String SEPERATOR = "\t";
    private static final int TIME_INDEX = 10;
    private static final String COUNT_FORMAT_DIGITS = "%010d";

    public static class Map extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text row, Context context)
                throws IOException, InterruptedException {
            String input = row.toString();
            String[] vals = input.split(SEPERATOR);
            if (vals != null && vals.length >= TIME_INDEX) {
                context.write(new Text(vals[TIME_INDEX - 1]), row);
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {

        @Override
        protected void reduce(Text eventTimeKey,
                Iterable<Text> timeGroupedRows, Context context)
                throws IOException, InterruptedException {
            int cnt = 1;
            final String eventTime = eventTimeKey.toString();
            for (Text val : timeGroupedRows) {
                final String res = SEPERATOR.concat(getDate(
                        Long.valueOf(eventTime)).concat(
                        String.format(COUNT_FORMAT_DIGITS, cnt)));
                val.append(res.getBytes(), 0, res.length());
                cnt++;
                context.write(NullWritable.get(), val);
            }
        }
    }

    public static String getDate(long time) {
        SimpleDateFormat utcSdf = new SimpleDateFormat("yyyyMMddhhmmss");
        utcSdf.setTimeZone(TimeZone.getTimeZone("America/Los_Angeles"));
        return utcSdf.format(new Date(time));
    }

    public int run(String[] args) throws Exception {
        conf(args);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        conf(args);
    }

    private static void conf(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "uniquekeygen");
        job.setJarByClass(UniqueKeyGenerator.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        // job.setNumReduceTasks(400);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }

}

It is consistent for higher no of lines and the difference is as huge as 208969 records for an input of 20855982 lines. what might be the reason for reduced inputs to reducer?

4

1 回答 1

0

数据丢失背后的原因是其中一个块发生了运行时异常,因此该块中可用的数据被完全忽略,导致减速器输入减少。

谢谢,萨西什。

于 2013-07-01T03:40:13.057 回答