0

使用我的 mapreduce 作业后,这是输出:

User16565   Logins: 1   Orders:1
User16566   Logins: 2   Orders:2
User16567   Logins: 1   Orders:1

一切看起来都很棒,但是当日志文件有数千个条目时,它并不是很有帮助。有没有办法改变我的代码来总结“登录”和“订单”,这样我就可以计算出差异?

编辑:新问题/问题

日志示例:

2013-01-01T08:48:09.009+0100,feature:login,-,User73511,-,-,-,-
2013-01-01T03:58:05.005+0100,feature:order-created,-,User73511,-,-,-,-
2013-01-01T01:26:30.030+0100,feature:login,-,User14253,-,-,-,-
2013-01-01T19:45:01.001+0100,feature:order-created,-,User73511,-,-,-,-

我在我的代码中发现了一个错误。我意识到登录和订单计数不正确。起初看起来输出是正确的,但是当我手动检查登录名和订单时,我意识到有一个错误。输出:

User73511   Logins: 3   Orders:2
User14253   Logins: 1   Orders:1

应该:

User73511   Logins: 1   Orders:2
User14253   Logins: 1   Orders:0

这是整个代码:

public class UserOrderCount {

    public static class SingleUserMapper extends
            Mapper<LongWritable, Text, Text, CountInformationTuple> {

        private Text outUserId = new Text();
        private CountInformationTuple outCountOrder = new CountInformationTuple();

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String tempString = value.toString();
            String[] singleUserData = tempString.split(",");
            String userId = singleUserData[3];
            String featureId = singleUserData[1];

        if (featureId.contains("feature:order-created")) {
                outCountOrder.setCountOrder(1);
        }
        if (featureId.contains("feature:login")) {
                outCountOrder.setCountLogin(1);
        }


            outUserId.set(userId);
            context.write(outUserId, outCountOrder);
        }
    }

    public static class SingleUserReducer extends
            Reducer<Text, CountInformationTuple, Text, CountInformationTuple> {

        private CountInformationTuple result = new CountInformationTuple();

        public void reduce(Text key, Iterable<CountInformationTuple> values,
                Context context) throws IOException, InterruptedException {

            int login = 0;
            int order = 0;

            for (CountInformationTuple val : values) {
                login += val.getCountLogin();
                order += val.getCountOrder();
            }

            result.setCountLogin(login);
            result.setCountOrder(order);

            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: UserOrderCount <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf);
        job.setJobName("UserOrderCount");
        job.setJarByClass(UserOrderCount.class);

        job.setMapperClass(SingleUserMapper.class);
        job.setCombinerClass(SingleUserReducer.class);
        job.setReducerClass(SingleUserReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CountInformationTuple.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class CountInformationTuple implements Writable {
        private int countOrder = 0;
        private int countLogin = 0;

        public int getCountOrder() {
            return countOrder;
        }

        public void setCountOrder(int order) {
            this.countOrder = order;
        }

        public int getCountLogin() {
            return countLogin;
        }

        public void setCountLogin(int login) {
            this.countLogin = login;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            countOrder = in.readInt();
            countLogin = in.readInt();

        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(countLogin);
            out.writeInt(countOrder);

        }

        @Override
        public String toString() {
            return "Logins: "+ countLogin + "\t" + "Orders:" + countOrder;
        }
    }
}
4

2 回答 2

2

对于感兴趣的人:解决了我的“错误输出”错误。

public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String tempString = value.toString();
        String[] stringData = tempString.split(",");

        String userID = stringData[3];
        String featureID = stringData[1];

        int login = 0;
        int order = 0;

        if (featureID.matches("feature:login")) {
            login++;
        } else if (featureID.matches("feature:order-created")) {
            order++;
        }

        outUserID.set(userID);
        outUserCount.set(login, order);

        context.write(outUserID, outUserCount);

    }

public static class UserCountTuple implements Writable {

        private IntWritable countLogin;
        private IntWritable countOrder;

        public UserCountTuple() {
            set(new IntWritable(0), new IntWritable(0));
        }

        public void set(int countLogin, int countOrder) {
            this.countLogin.set(countLogin);
            this.countOrder.set(countOrder);
        }

        public void set(IntWritable countLogin, IntWritable countOrder) {
            this.countLogin = countLogin;
            this.countOrder = countOrder;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            countLogin.readFields(in);
            countOrder.readFields(in);

        }

        @Override
        public void write(DataOutput out) throws IOException {
            countLogin.write(out);
            countOrder.write(out);

        }

        public IntWritable getLogin() {
            return countLogin;
        }

        public IntWritable getOrder() {
            return countOrder;
        }

        @Override
        public String toString() {
            return "Logins: " + countLogin + "\t" + "Orders:" + countOrder;
        }

    }
于 2013-02-28T15:23:31.167 回答
1

由于您希望获得单个文件作为结果,您可以配置 MapReduce 作业jobConf.setNumReduceTasks(1)以仅使用单个 reduce 任务,请参阅JobConf JavaDoc了解更多信息。

现在,您的唯一一个 reduce 任务获得了所有用户login的全部order信息。您可以在 reduce 任务中对所有已处理记录login的值求和,然后在cleanup()order方法中输出求和值,该方法仅在处理单个 reduce 任务的所有输入记录后调用一次。示例代码:

public static class SingleUserReducer extends
        Reducer<Text, CountInformationTuple, Text, CountInformationTuple> {

    private CountInformationTuple result = new CountInformationTuple();
    private int login = 0;
    private int order = 0;

    public void reduce(Text key, Iterable<CountInformationTuple> values,
            Context context) throws IOException, InterruptedException {

        for (CountInformationTuple val : values) {
            login += val.getCountLogin();
            order += val.getCountOrder();
        }
    }

    public void cleanup(Context context) throws IOException, InterruptedException {
        result.setCountLogin(login);
        result.setCountOrder(order);

        context.write(new Text("total"), result);
    }
}

您将获得一条记录作为输出,其总和为loginorder。如果需要,您可以修改该cleanup()方法以计算差异和其他度量。

于 2013-02-23T09:39:51.433 回答