比较常见的问题,不知道怎么选。
我有字段:id、creationDate、state、dateDiff
id是一个自然键。
我需要进入我的减速器:
KEY(id), VALUE(creationDate, state, dateDiff)
VALUE(creationDate, state, dateDiff) 应按以下方式排序:creationDate, state
我应该选择什么钥匙?我确实创建了复合键(id、creationDate、state)
我确实通过id实现 了分区器
id石斑鱼
按id、creationDate、state排序
我的减速器只有唯一的 id ... 例如:
1 123 true 6
1 456 false 6
1 789 true 7
我只得到
1 123 true 6
在我的减速机中。好像我没有分类器,分区器,石斑鱼:(有很多理解。
这是我的代码:
public class POIMapper extends Mapper<LongWritable, Text, XVLRKey, XVLRValue>{
private static final Log LOG = LogFactory.getLog(POIMapper.class);
@Override
public void map(LongWritable key, Text csvLine, Context context) throws IOException, InterruptedException {
Pair<XVLRKey, XVLRValue> xvlrPair = POIUtil.parseKeyAndValue(csvLine.toString(), POIUtil.CSV_DELIMITER);
context.write(xvlrPair.getValue0(), xvlrPair.getValue1());
}
}
public class POIReducer extends Reducer<XVLRKey, XVLRValue, LongWritable, Text>{
private static final Log LOG = LogFactory.getLog(POIReducer.class);
private final Text textForOutput = new Text();
@Override()
public void reduce(XVLRKey key, Iterable<XVLRValue> values, Context context)
throws IOException, InterruptedException {
XVLROutput out = null;
//Just check that values are correctly attached to keys. No logic here...
LOG.info("\nPOIReducer: key:"+key);
for(XVLRValue value : values){
LOG.info("\n --- --- --- value:"+value+"\n");
textForOutput.set(print(key, value));
context.write(key.getMsisdn(), textForOutput);
}
}
private String print(XVLRKey key, XVLRValue value){
StringBuilder builder = new StringBuilder();
builder.append(value.getLac()) .append("\t")
.append(value.getCellId()) .append("\t")
.append(key.getDateOccurrence()) .append("\t")
.append(value.getTimeDelta());
return builder.toString();
}
}
职位代码:
JobBuilder<POIJob> jobBuilder = createTestableJobInstance();
jobBuilder.withOutputKey(XVLRKey.class);
jobBuilder.withOutputValue(XVLRValue.class);
jobBuilder.withMapper(POIMapper.class);
jobBuilder.withReducer(POIReducer.class);
jobBuilder.withInputFormat(TextInputFormat.class);
jobBuilder.withOutputFormat(TextOutputFormat.class);
jobBuilder.withPartitioner(XVLRKeyPartitioner.class);
jobBuilder.withSortComparator(XVLRCompositeKeyComparator.class);
jobBuilder.withGroupingComparator(XVLRKeyGroupingComparator.class);
boolean result = buildSubmitAndWaitForCompletion(jobBuilder);
MatcherAssert.assertThat(result, Matchers.is(true));
public class XVLRKeyPartitioner extends Partitioner<XVLRKey, XVLRValue> {
@Override
public int getPartition(XVLRKey key, XVLRValue value, int numPartitions) {
return Math.abs(key.getMsisdn().hashCode() * 127) % numPartitions;
}
}
public class XVLRCompositeKeyComparator extends WritableComparator {
protected XVLRCompositeKeyComparator() {
super(XVLRKey.class, true);
}
@Override
public int compare(WritableComparable writable1, WritableComparable writable2) {
XVLRKey key1 = (XVLRKey) writable1;
XVLRKey key2 = (XVLRKey) writable2;
return key1.compareTo(key2);
}
}
public class XVLRKeyGroupingComparator extends WritableComparator {
protected XVLRKeyGroupingComparator() {
super(XVLRKey.class, true);
}
@Override
public int compare(WritableComparable writable1, WritableComparable writable2) {
XVLRKey key1 = (XVLRKey) writable1;
XVLRKey key2 = (XVLRKey) writable2;
return key1.getMsisdn().compareTo(key2.getMsisdn());
}
}
public class XVLRKey implements WritableComparable<XVLRKey>{
private final LongWritable msisdn;
private final LongWritable dateOccurrence;
private final BooleanWritable state;
//getters-setters
}
public class XVLRValue implements WritableComparable<XVLRValue> {
private final LongWritable lac;
private final LongWritable cellId;
private final LongWritable timeDelta;
private final LongWritable dateOccurrence;
private final BooleanWritable state;
//getters-setterrs
}
请查看 XVLRKey、XVLRValue 确实有重复的字段。我在 XVLRKey 中复制了 dateOccurrence 和 state,因为我想在 reducer 中获取排序值。它们应该按 dateOccurrence 排序。
我找不到不重复的方法来解决这个问题。