0

我正在编写一个 Hadoop 应用程序,但似乎我误解了 hadoop 的工作原理。我的输入文件是地图的瓦片,根据QuadTile原则命名。我需要对它们进行二次采样,并将它们拼接在一起,直到我有一个覆盖更大区域但分辨率更低的某个更高级别的图块。就像缩小谷歌地图一样。

我所做的一件事是我编写了一个映射器,它在每个(不可分割的)图块上执行,如下所示:

public void map(Text keyT, ImageWritable value, Context context) throws IOException, InterruptedException {

     String key = keyT.toString();

    //check whether file needs to be processed
     if(key.startsWith(context.getJobName(), 0)){

         String newKey = key.substring(0, key.length()-1);
         ImageWritable iw = subSample(value);
         char region = key.charAt(key.length()-1);
         iw.setRegion(region);
         context.write(new Text(newKey), iw);
     }else{
         //tile not needed in calculation
     }
 }

我的减速器看起来像这样:

public void reduce(Text key, Iterable<ImageWritable> values, Context context) throws IOException, InterruptedException{

    ImageWritable higherLevelTile = new ImageWritable();
    int i = 0;
    for(ImageWritable s : values){
        int width = s.getWidth();
        int height = s.getHeight();
        char c = Character.toUpperCase(s.getRegion());
        int basex=0, basey=0;
        if(c=='A'){
            basex = basey = 0;
        }else if(c=='B'){
            basex = width;
            basey = 0;
        }else if(c=='C'){
            basex = 0;
            basey = height;             
        }else{
            basex = width;
            basey = height;
        }

        BufferedImage toDraw = s.getBufferedImage();
        Graphics g = higherLevelTile.getBufferedImage().getGraphics();
        g.drawImage(toDraw, basex, basey, null);
    }               
    context.write(key, higherLevelTile);

}

正如您可能从我的代码中得出的那样,我希望 hadoop 以以下方式执行:1)映射一级的所有图块 2)进行第一次减少。在这里,我希望 Iterable 值具有四个元素:较低级别的四个子采样图块。3) 映射当前在上下文中的所有瓦片 4) 减少上下文中的所有瓦片。同样,可迭代值将有 4 个元素... 5) ... 重复... 6) 当没有更多地图时 -> 写入输出

事实证明,这是不正确的。在每个 Map 之后都会调用我的 reducer,而 Iterable 似乎从来没有超过一个元素。我试图通过假设 Iterable 将具有 2 个元素来稍微更改 reducer 代码来解决这个问题:一个子采样值和一个部分完成的高级瓦片。事实证明,这也不正确。

Can anyone tell me, or point me towards, how the flow of hadoop actually is? What should I do to make my use-case work? I hope I explained it clearly.

4

1 回答 1

3

Your assumption is right that all of the maps complete before the first reduce starts. That is because each reduce is guaranteed to get its input in sorted order and the last map to finish may produce the first key for all of the reduces.

Each map produces its output, a pluggable interface called the partitioner picks the reduce that should receive each key. The default uses key.hashCode() % num_reduces, because that gives good distribution in the normal case. That might be your problem since there is no requirement "A", "AB", and "ABC" will go to the same reduce.

最后,每个reduce 都会为其每个key 调用一次。迭代器遍历与同一键关联的值。请注意,这些值通常是未排序的,但可以通过二级排序来控制。

看看:http ://riccomini.name/posts/hadoop/2009-11-13-sort-reducer-input-value-hadoop/ 。

如果你想要一个二级排序的例子,我写了一个并将它放在 Hadoop 例子中。http://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/SecondarySort.java

于 2012-11-17T15:56:49.837 回答