0

比较常见的问题,不知道怎么选。

我有字段:id、creationDate、state、dateDiff

id是一个自然键

我需要进入我的减速器:

KEY(id), VALUE(creationDate, state, dateDiff)

VALUE(creationDate, state, dateDiff) 应按以下方式排序:creationDate, state

我应该选择什么钥匙?我确实创建了复合键(id、creationDate、state)

我确实通过id实现 了分区器

id石斑鱼

id、creationDate、state排序

我的减速器只有唯一的 id ... 例如:

1 123 true  6
1 456 false 6
1 789 true  7

我只得到

1 123 true  6

在我的减速机中。好像我没有分类器,分区器,石斑鱼:(有很多理解。

这是我的代码:

public class POIMapper extends Mapper<LongWritable, Text, XVLRKey, XVLRValue>{

    private static final Log LOG = LogFactory.getLog(POIMapper.class);

    @Override
    public void map(LongWritable key, Text csvLine, Context context) throws IOException, InterruptedException {
        Pair<XVLRKey, XVLRValue> xvlrPair = POIUtil.parseKeyAndValue(csvLine.toString(), POIUtil.CSV_DELIMITER);
        context.write(xvlrPair.getValue0(), xvlrPair.getValue1());
    }

}

public class POIReducer extends Reducer<XVLRKey, XVLRValue, LongWritable, Text>{

    private static final Log LOG = LogFactory.getLog(POIReducer.class);

    private final Text textForOutput = new Text();

    @Override()
    public void reduce(XVLRKey key, Iterable<XVLRValue> values, Context context)
                                                                            throws IOException, InterruptedException {
        XVLROutput out = null;
//Just check that values are correctly attached to keys. No logic here...
        LOG.info("\nPOIReducer: key:"+key);
        for(XVLRValue value : values){
            LOG.info("\n --- --- --- value:"+value+"\n");
            textForOutput.set(print(key, value));
            context.write(key.getMsisdn(), textForOutput);
        }
    }

    private String print(XVLRKey key, XVLRValue value){
        StringBuilder builder = new StringBuilder();
        builder.append(value.getLac())          .append("\t")
               .append(value.getCellId())       .append("\t")
               .append(key.getDateOccurrence()) .append("\t")
               .append(value.getTimeDelta());
        return builder.toString();
    }
}

职位代码:

JobBuilder<POIJob> jobBuilder = createTestableJobInstance();

        jobBuilder.withOutputKey(XVLRKey.class);
        jobBuilder.withOutputValue(XVLRValue.class);

        jobBuilder.withMapper(POIMapper.class);
        jobBuilder.withReducer(POIReducer.class);

        jobBuilder.withInputFormat(TextInputFormat.class);
        jobBuilder.withOutputFormat(TextOutputFormat.class);

        jobBuilder.withPartitioner(XVLRKeyPartitioner.class);
        jobBuilder.withSortComparator(XVLRCompositeKeyComparator.class);
        jobBuilder.withGroupingComparator(XVLRKeyGroupingComparator.class);

        boolean result = buildSubmitAndWaitForCompletion(jobBuilder);
        MatcherAssert.assertThat(result, Matchers.is(true));




public class XVLRKeyPartitioner extends Partitioner<XVLRKey, XVLRValue> {

    @Override
    public int getPartition(XVLRKey key, XVLRValue value, int numPartitions) {
            return Math.abs(key.getMsisdn().hashCode() * 127) % numPartitions;
    }
}

public class XVLRCompositeKeyComparator extends WritableComparator {

    protected XVLRCompositeKeyComparator() {
        super(XVLRKey.class, true);
    }

    @Override
    public int compare(WritableComparable writable1, WritableComparable writable2) {
        XVLRKey key1 = (XVLRKey) writable1;
        XVLRKey key2 = (XVLRKey) writable2;
       return key1.compareTo(key2);
    }
}

public class XVLRKeyGroupingComparator extends WritableComparator {

    protected XVLRKeyGroupingComparator() {
        super(XVLRKey.class, true);
    }

    @Override
    public int compare(WritableComparable writable1, WritableComparable writable2) {

        XVLRKey key1 = (XVLRKey) writable1;
        XVLRKey key2 = (XVLRKey) writable2;

        return key1.getMsisdn().compareTo(key2.getMsisdn());

    }
}

public class XVLRKey implements WritableComparable<XVLRKey>{

    private  final LongWritable msisdn;
    private  final LongWritable dateOccurrence;
    private  final BooleanWritable state;
//getters-setters
}

public class XVLRValue implements WritableComparable<XVLRValue> {

    private final LongWritable lac;
    private final LongWritable cellId;
    private final LongWritable timeDelta;
    private final LongWritable dateOccurrence;
    private final BooleanWritable state;
//getters-setterrs
}

请查看 XVLRKey、XVLRValue 确实有重复的字段。我在 XVLRKey 中复制了 dateOccurrence 和 state,因为我想在 reducer 中获取排序值。它们应该按 dateOccurrence 排序。

我找不到不重复的方法来解决这个问题。

4

1 回答 1

0

在二级排序情况下(如您所描述的),当您从迭代器中检索下一个值时,您拥有的键的值会发生变化。

发生这种情况是因为 Hadoop 框架重用了对象的实例,以尽可能避免对象创建和垃圾收集。

因此,当您调用“next()”时,框架也会更改键实例内的数据。

所以如果你移动

    LOG.info("\nPOIReducer: key:"+key);

语句,以便它位于 for 循环内,您应该会看到所有的键。

由于这种影响,我的工作基本上遵循以下“规则”:

该键仅由框架用于将值定向到正确的减速器。

这意味着

  1. 我可能需要的一切都必须存在于值中。
  2. 在减速器中,我只查看值,并且总是丢弃/忽略键。
  3. 用于创建键的属性也可以在值中找到。
于 2013-01-17T16:25:52.897 回答