使用我的 mapreduce 作业后,这是输出:
User16565 Logins: 1 Orders:1
User16566 Logins: 2 Orders:2
User16567 Logins: 1 Orders:1
一切看起来都很棒,但是当日志文件有数千个条目时,它并不是很有帮助。有没有办法改变我的代码来总结“登录”和“订单”,这样我就可以计算出差异?
编辑:新问题/问题
日志示例:
2013-01-01T08:48:09.009+0100,feature:login,-,User73511,-,-,-,-
2013-01-01T03:58:05.005+0100,feature:order-created,-,User73511,-,-,-,-
2013-01-01T01:26:30.030+0100,feature:login,-,User14253,-,-,-,-
2013-01-01T19:45:01.001+0100,feature:order-created,-,User73511,-,-,-,-
我在我的代码中发现了一个错误。我意识到登录和订单计数不正确。起初看起来输出是正确的,但是当我手动检查登录名和订单时,我意识到有一个错误。输出:
User73511 Logins: 3 Orders:2
User14253 Logins: 1 Orders:1
应该:
User73511 Logins: 1 Orders:2
User14253 Logins: 1 Orders:0
这是整个代码:
public class UserOrderCount {
public static class SingleUserMapper extends
Mapper<LongWritable, Text, Text, CountInformationTuple> {
private Text outUserId = new Text();
private CountInformationTuple outCountOrder = new CountInformationTuple();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String tempString = value.toString();
String[] singleUserData = tempString.split(",");
String userId = singleUserData[3];
String featureId = singleUserData[1];
if (featureId.contains("feature:order-created")) {
outCountOrder.setCountOrder(1);
}
if (featureId.contains("feature:login")) {
outCountOrder.setCountLogin(1);
}
outUserId.set(userId);
context.write(outUserId, outCountOrder);
}
}
public static class SingleUserReducer extends
Reducer<Text, CountInformationTuple, Text, CountInformationTuple> {
private CountInformationTuple result = new CountInformationTuple();
public void reduce(Text key, Iterable<CountInformationTuple> values,
Context context) throws IOException, InterruptedException {
int login = 0;
int order = 0;
for (CountInformationTuple val : values) {
login += val.getCountLogin();
order += val.getCountOrder();
}
result.setCountLogin(login);
result.setCountOrder(order);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: UserOrderCount <in> <out>");
System.exit(2);
}
Job job = new Job(conf);
job.setJobName("UserOrderCount");
job.setJarByClass(UserOrderCount.class);
job.setMapperClass(SingleUserMapper.class);
job.setCombinerClass(SingleUserReducer.class);
job.setReducerClass(SingleUserReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CountInformationTuple.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class CountInformationTuple implements Writable {
private int countOrder = 0;
private int countLogin = 0;
public int getCountOrder() {
return countOrder;
}
public void setCountOrder(int order) {
this.countOrder = order;
}
public int getCountLogin() {
return countLogin;
}
public void setCountLogin(int login) {
this.countLogin = login;
}
@Override
public void readFields(DataInput in) throws IOException {
countOrder = in.readInt();
countLogin = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(countLogin);
out.writeInt(countOrder);
}
@Override
public String toString() {
return "Logins: "+ countLogin + "\t" + "Orders:" + countOrder;
}
}
}