假设我有一个表示一对(字符串 stockSymbol,长时间戳)的 CompositeKey。我们可以对 stockSymbol 字段进行一次分组传递,以将一种类型的所有数据放在一起,然后我们在 shuffle 阶段的“二次排序”使用时间戳长成员对时间序列点进行排序,以便它们到达减速器分区和排序。
public class CompositeKey implements WritableComparable<CompositeKey> {
// natural key is (stockSymbol)
// composite key is a pair (stockSymbol, timestamp)
private String stockSymbol;
private long timestamp;
......//Getter setter omiited for clarity here
@Override
public void readFields(DataInput in) throws IOException {
this.stockSymbol = in.readUTF();
this.timestamp = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.stockSymbol);
out.writeLong(this.timestamp);
}
@Override
public int compareTo(CompositeKey other) {
if (this.stockSymbol.compareTo(other.stockSymbol) != 0) {
return this.stockSymbol.compareTo(other.stockSymbol);
}
else if (this.timestamp != other.timestamp) {
return timestamp < other.timestamp ? -1 : 1;
}
else {
return 0;
}
}
现在 CompositeKey 比较器将是:
public class CompositeKeyComparator extends WritableComparator {
protected CompositeKeyComparator() {
super(CompositeKey.class, true);
}
@Override
public int compare(WritableComparable wc1, WritableComparable wc2) {
CompositeKey ck1 = (CompositeKey) wc1;
CompositeKey ck2 = (CompositeKey) wc2;
int comparison = ck1.getStockSymbol().compareTo(ck2.getStockSymbol());
if (comparison == 0) {
// stock symbols are equal here
if (ck1.getTimestamp() == ck2.getTimestamp()) {
return 0;
}
else if (ck1.getTimestamp() < ck2.getTimestamp()) {
return -1;
}
else {
return 1;
}
}
else {
return comparison;
}
}
}