0

我有这个 hadoop map reduce 代码,它适用于图形数据(以邻接表形式),有点类似于邻接表到邻接表转换算法。主要的 MapReduce 任务代码如下:

public class TestTask extends Configured
implements Tool {

public static class TTMapper extends MapReduceBase
    implements Mapper<Text, TextArrayWritable, Text, NeighborWritable> {

    @Override
    public void map(Text key, 
            TextArrayWritable value,
            OutputCollector<Text, NeighborWritable> output, 
            Reporter reporter) throws IOException {

        int numNeighbors = value.get().length;
        double weight = (double)1 / numNeighbors;

        Text[] neighbors = (Text[]) value.toArray();

        NeighborWritable me = new NeighborWritable(key, new DoubleWritable(weight));

        for (int i = 0; i < neighbors.length; i++) {
            output.collect(neighbors[i], me);
        }   
    }       
}

public static class TTReducer extends MapReduceBase
    implements Reducer<Text, NeighborWritable, Text, Text> {

    @Override
    public void reduce(Text key, 
                        Iterator<NeighborWritable> values,
                        OutputCollector<Text, Text> output, 
                        Reporter arg3)
            throws IOException {

        ArrayList<NeighborWritable> neighborList = new ArrayList<NeighborWritable>();

        while(values.hasNext()) {
            neighborList.add(values.next());
        }

        NeighborArrayWritable neighbors = new NeighborArrayWritable
                            (neighborList.toArray(new NeighborWritable[0]));

        Text out = new Text(neighbors.toString());

        output.collect(key, out);

    }

}

@Override
public int run(String[] arg0) throws Exception {
    JobConf conf = Util.getMapRedJobConf("testJob",
                                         SequenceFileInputFormat.class, 
                                         TTMapper.class, 
                                         Text.class, 
                                         NeighborWritable.class, 
                                         1, 
                                         TTReducer.class, 
                                         Text.class, 
                                         Text.class, 
                                         TextOutputFormat.class, 
                                         "test/in", 
                                         "test/out");
    JobClient.runJob(conf);
    return 0;
}

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new TestTask(), args);
    System.exit(res);
}

}

辅助代码如下:TextArrayWritable:

public class TextArrayWritable extends ArrayWritable {
public TextArrayWritable() {
    super(Text.class);
}

public TextArrayWritable(Text[] values) {
    super(Text.class, values);
}

}

邻居可写:

public class NeighborWritable implements Writable {

private Text nodeId;
private DoubleWritable weight;

public NeighborWritable(Text nodeId, DoubleWritable weight) {
    this.nodeId = nodeId;
    this.weight = weight;
}

public NeighborWritable () { }

public Text getNodeId() {
    return nodeId;
}

public DoubleWritable getWeight() {
    return weight;
}

public void setNodeId(Text nodeId) {
    this.nodeId = nodeId;
}

public void setWeight(DoubleWritable weight) {
    this.weight = weight;
}

@Override
public void readFields(DataInput in) throws IOException {
    nodeId = new Text();
    nodeId.readFields(in);

    weight = new DoubleWritable();
    weight.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
    nodeId.write(out);
    weight.write(out);
}

public String toString() {
    return "NW[nodeId=" + (nodeId != null ? nodeId.toString() : "(null)") +
        ",weight=" + (weight != null ? weight.toString() : "(null)") + "]";
}

public boolean equals(Object o) {
    if (!(o instanceof NeighborWritable)) {
        return false;
    }

    NeighborWritable that = (NeighborWritable)o;

    return (nodeId.equals(that.getNodeId()) && (weight.equals(that.getWeight())));
}

}

和 Util 类:

public class Util {

public static JobConf getMapRedJobConf(String jobName,
                                              Class<? extends InputFormat> inputFormatClass,
                                              Class<? extends Mapper> mapperClass,
                                              Class<?> mapOutputKeyClass,
                                              Class<?> mapOutputValueClass,
                                              int numReducer,
                                              Class<? extends Reducer> reducerClass,
                                              Class<?> outputKeyClass,
                                              Class<?> outputValueClass,
                                              Class<? extends OutputFormat> outputFormatClass,
                                              String inputDir,
                                              String outputDir) throws IOException {

    JobConf conf = new JobConf();

    if (jobName != null)
        conf.setJobName(jobName);

    conf.setInputFormat(inputFormatClass);

    conf.setMapperClass(mapperClass);

    if (numReducer == 0) {
        conf.setNumReduceTasks(0);

        conf.setOutputKeyClass(outputKeyClass);
        conf.setOutputValueClass(outputValueClass);

        conf.setOutputFormat(outputFormatClass);

    } else {
        // may set actual number of reducers
        // conf.setNumReduceTasks(numReducer);

        conf.setMapOutputKeyClass(mapOutputKeyClass);
        conf.setMapOutputValueClass(mapOutputValueClass);

        conf.setReducerClass(reducerClass);

        conf.setOutputKeyClass(outputKeyClass);
        conf.setOutputValueClass(outputValueClass);

        conf.setOutputFormat(outputFormatClass);

    }

    // delete the existing target output folder
    FileSystem fs = FileSystem.get(conf);
    fs.delete(new Path(outputDir), true);


    // specify input and output DIRECTORIES (not files)
    FileInputFormat.addInputPath(conf, new Path(inputDir));
    FileOutputFormat.setOutputPath(conf, new Path(outputDir));

    return conf;        

}

}

我的输入如下图:(二进制格式,这里我给出文本格式)

1   2
2   1,3,5
3   2,4
4   3,5
5   2,4

根据代码的逻辑,输出应该是:

1   NWArray[size=1,{NW[nodeId=2,weight=0.3333333333333333],}]
2   NWArray[size=3,{NW[nodeId=5,weight=0.5],NW[nodeId=3,weight=0.5],NW[nodeId=1,weight=1.0],}]
3   NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=4,weight=0.5],}]
4   NWArray[size=2,{NW[nodeId=5,weight=0.5],NW[nodeId=3,weight=0.5],}]
5   NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=4,weight=0.5],}]

但输出如下:

1   NWArray[size=1,{NW[nodeId=2,weight=0.3333333333333333],}]
2   NWArray[size=3,{NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],}]
3   NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=2,weight=0.3333333333333333],}]
4   NWArray[size=2,{NW[nodeId=5,weight=0.5],NW[nodeId=5,weight=0.5],}]
5   NWArray[size=2,{NW[nodeId=2,weight=0.3333333333333333],NW[nodeId=2,weight=0.3333333333333333],}]

我无法理解预期输出没有出现的原因。任何帮助将不胜感激。

谢谢。

4

2 回答 2

3

你正在犯规对象重用

while(values.hasNext()) {
    neighborList.add(values.next());
}

values.next()将返回相同的对象引用,但该对象的基础内容将在每次迭代时发生变化(readFields调用该方法以重新填充内容)

建议您修改(您需要conf从设置方法获取配置变量,除非您可以从 Reporter 或 OutputCollector 获取它 - 抱歉,我不使用旧 API)

while(values.hasNext()) {
    neighborList.add(
        ReflectionUtils.copy(conf, values.next(), new NeighborWritable());
}
于 2012-07-12T14:32:37.390 回答
0

但是我仍然不明白为什么我的单元测试通过了。这是代码 -

public class UWLTInitReducerTest {

private Text key;
private Iterator<NeighborWritable> values;
private NeighborArrayWritable nodeData;
private TTReducer reducer;

/**
 * Set up the states for calling the map function
 */
@Before
public void setUp() throws Exception {
    key = new Text("1001");
    NeighborWritable[] neighbors = new NeighborWritable[4];
    for (int i = 0; i < 4; i++) {
        neighbors[i] = new NeighborWritable(new Text("300" + i), new DoubleWritable((double) 1 / (1 + i)));
    }

    values = Arrays.asList(neighbors).iterator();

    nodeData = new NeighborArrayWritable(neighbors);

    reducer = new TTReducer();

}

/**
 * Test method for InitModelMapper#map - valid input
 */
@Test
public void testMapValid() {

    // mock the output object
    OutputCollector<Text, UWLTNodeData> output = mock(OutputCollector.class);

    try {
        // call the API
        reducer.reduce(key, values, output, null);

        // in order (sequential) verification of the calls to output.collect()
        verify(output).collect(key, nodeData);

    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

}

为什么这段代码没有捕捉到错误?

于 2012-07-12T19:17:52.323 回答