我希望根据事件发生的时间为来自 Mapper 类的事件分配一个序列号。
例如,我有 100 个事件,其中有时间。我希望根据时间对它们进行排序,然后在减速器阶段为它们分配一个序列号。此外,如果它们是重复的,则在 reducer 阶段删除重复的记录(同时发生相同的事件)。
映射器方法:
public class EventMapper extends Mapper<LongWritable, Text, Text, Event> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
Text newKey;
Event e = new Event();
e.setAllValues(line);
newKey = new Text(e.getKey());
context.write(newKey, e);
}
}
减速器方法(我想要的东西):
public class EventReducer extends Reducer<Text, Event, Text, Text> {
public void reduce(Text key, Iterator<Event> itrtr, Context context) throws IOException, InterruptedException {
Event e;
List<Event> l = new ArrayList<Event>();
while(itrtr.hasNext()){
e = itrtr.next();
l.add(e);
}
Collections.sort(l);
long i = 1;
for (Event event : l) {
event.setId(++i);
context.write(key, new Text(event.toString()));
}
}
}
我把所有的 id 都设为 0。我怎样才能做到这一点?我是否采用了错误的方法。
这是事件类:
public class Event implements Writable, WritableComparable<Event> {
//Some variables and getter + setters
@Override
public String toString() {
String delimiter1 = "|";
return this.date + delimiter1
+ this.evName + delimiter1
+ this.evType + delimiter1
+ this.evValue + delimiter1
+ this.name + delimiter1
+ this.id;
}
@Override
public void readFields(DataInput in) throws IOException {
try {
this.date = converStringToDate((WritableUtils.readString(in)).toString(), dateFormat);
} catch (ParseException ex) {
System.out.println("Wront date . Pe");
}
this.evName = WritableUtils.readString(in);
this.evType = WritableUtils.readString(in);
this.evValue = WritableUtils.readString(in);
this.name = WritableUtils.readString(in);
this.id = WritableUtils.readVLong(in);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
WritableUtils.writeString(out, this.convertDateToString(date));
WritableUtils.writeString(out, evName);
WritableUtils.writeString(out, evType);
WritableUtils.writeString(out, evValue);
WritableUtils.writeString(out, name);
WritableUtils.writeVLong(out, id);
}
public int compareTo(Event o) {
long value = this.getDate().getTime() - o.getDate().getTime();
if (value == 0) {
return 0;
} else if (value > 1) {
return -1;
} else {
return 1;
}
}
public void setAllValues(String input) {
String[] arrValues = input.split(delimiter);
System.out.println("No of Values = " + arrValues.length);
try {
this.date = converStringToDate(arrValues[0], dateFormat);
} catch (ParseException pe) {
System.out.println("pe> Error in date");
}
if (arrValues.length >= 2) {
this.evName = arrValues[1];
}
if (arrValues.length >= 3) {
this.evType = arrValues[2];
}
if (arrValues.length >= 4) {
this.evValue = arrValues[3];
}
if (arrValues.length >= 5) {
this.name = arrValues[4];
}
}
public String getKey() {
//return convertDateToString(this.date) + this.evName + this.evType;
return this.evName;
}
}