0

我希望根据事件发生的时间为来自 Mapper 类的事件分配一个序列号。

例如,我有 100 个事件,其中有时间。我希望根据时间对它们进行排序,然后在减速器阶段为它们分配一个序列号。此外,如果它们是重复的,则在 reducer 阶段删除重复的记录(同时发生相同的事件)。

映射器方法:

public class EventMapper extends Mapper<LongWritable, Text, Text, Event> {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    Text newKey;
    Event e = new Event();
    e.setAllValues(line);
    newKey = new Text(e.getKey());
    context.write(newKey, e);
}
}

减速器方法(我想要的东西):

public class EventReducer extends Reducer<Text, Event, Text, Text> {

public void reduce(Text key, Iterator<Event> itrtr, Context context) throws IOException, InterruptedException {
    Event e;
    List<Event> l = new ArrayList<Event>();
    while(itrtr.hasNext()){
        e = itrtr.next();
         l.add(e);
    }
    Collections.sort(l);
    long i = 1;
    for (Event event : l) {
        event.setId(++i);
        context.write(key, new Text(event.toString()));
    }
}
}

我把所有的 id 都设为 0。我怎样才能做到这一点?我是否采用了错误的方法。

这是事件类:

public class Event implements Writable, WritableComparable<Event> {
//Some variables and getter + setters
 @Override
public String toString() {
    String delimiter1 = "|";
    return this.date + delimiter1
            + this.evName + delimiter1
            + this.evType + delimiter1
            + this.evValue + delimiter1
            + this.name + delimiter1
            + this.id;
}

@Override
public void readFields(DataInput in) throws IOException {
    try {
        this.date = converStringToDate((WritableUtils.readString(in)).toString(), dateFormat);
    } catch (ParseException ex) {
        System.out.println("Wront date . Pe");
    }
    this.evName = WritableUtils.readString(in);
    this.evType = WritableUtils.readString(in);
    this.evValue = WritableUtils.readString(in);
    this.name = WritableUtils.readString(in);
    this.id = WritableUtils.readVLong(in);
}

@Override
public void write(DataOutput out) throws IOException {
    // TODO Auto-generated method stub
    WritableUtils.writeString(out, this.convertDateToString(date));
    WritableUtils.writeString(out, evName);
    WritableUtils.writeString(out, evType);
    WritableUtils.writeString(out, evValue);
    WritableUtils.writeString(out, name);
    WritableUtils.writeVLong(out, id);
}

public int compareTo(Event o) {
    long value = this.getDate().getTime() - o.getDate().getTime();
    if (value == 0) {
        return 0;
    } else if (value > 1) {
        return -1;
    } else {
        return 1;
    }
    }
public void setAllValues(String input) {
    String[] arrValues = input.split(delimiter);
    System.out.println("No of Values = " + arrValues.length);
    try {
        this.date = converStringToDate(arrValues[0], dateFormat);
    } catch (ParseException pe) {
        System.out.println("pe> Error in date");
    }
    if (arrValues.length >= 2) {
        this.evName = arrValues[1];
    }
    if (arrValues.length >= 3) {
        this.evType = arrValues[2];
    }
    if (arrValues.length >= 4) {
        this.evValue = arrValues[3];
    }
    if (arrValues.length >= 5) {
        this.name = arrValues[4];
    }
}

public String getKey() {
    //return convertDateToString(this.date) + this.evName + this.evType;
    return this.evName;
}
}
4

1 回答 1

0

几个建议:

  • 更改 getKey() 以返回 date.getTime()。这是一个长值,并且比字符串更快。将您的内部键类型更改为 LongWritable。
  • 您正在利用 hadoop 行为,即在传递给减速器之前按键值对记录进行排序。这是一种排序方式,但您必须确保在作业配置中将 numberOfReducers 设置为 1。否则,您将有多个减速器从 1 开始在它们自己的分区上分配等级。
  • 您可以使用多个 reducer,但您必须在这项工作之后完成一项工作,以合并所有内部排名的数据分区。
  • 请记住,您的 reducer 将为每个键值调用一次,即使该键有多个记录(例如同时多个事件)。Iterable如果您想忽略这些重复事件,那么无论该值有多少条记录,reducer 都应该只将一条记录写入上下文。
  • 为了正确分配等级(id),您需要在您的reducer 中有一个long 类型的实例变量(调用它counter)。您需要在方法中初始化它并在setup()方法中增加它reduce()
于 2012-11-28T14:43:51.427 回答