4

在我的 MapReduce 程序的 Reduce 阶段,我执行的唯一操作是连接提供的 Iterator 中的每个值,如下所示:

public void reduce(Text key, Iterator<text> values,
                    OutputCollector<Text, Text> output, Reporter reporter) {
    Text next;
    Text outKey = new Text()
    Text outVal = new Text();
    StringBuilder sb = new StringBuilder();
    while(values.hasNext()) {
        next = values.next();
        sb.append(next.toString());
        if (values.hasNext())
            sb.append(',');
    }
    outKey.set(key.toString());
    outVal.set(sb.toSTring());
    output.collect(outKey,outVal);
}

我的问题是一些减少输出值是大行文本;如此之大,以至于即使初始大小非常大,字符串缓冲区也必须将其大小增加(加倍)数倍以容纳迭代器的所有上下文,从而导致内存问题。

在传统的 Java 应用程序中,这表明缓冲写入文件将是写入输出的首选方法。如何在 Hadoop 中处理超大的输出键值对?我应该将结果直接流式传输到 HDFS 上的文件(每个 reduce 调用一个文件)吗?除了 output.collect 方法之外,有没有办法缓冲输出?

注意:我已经尽可能地增加了我的内存/堆大小。此外,一些消息来源表明,增加 reducer 的数量可以帮助解决内存/堆问题,但这里的问题已直接追溯到 SringBuilder 在扩展其容量时的使用。

谢谢

4

2 回答 2

4

并不是说我理解您为什么想要拥有巨大的价值,但是有一种方法可以做到这一点。

如果您编写自己的 OutputFormat,则可以修复RecordWriter.write(Key, Value)方法的行为以根据 Key 值是否为空来处理值连接。

这样,在您的减速器中,您可以编写如下代码(键的第一个输出是实际键,之后的所有内容都是空键:

public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter) {
  boolean firstKey = true;
  for (Text value : values) {
    output.collect(firstKey ? key : null, value);
    firstKey = false;
  }
}

实际RecordWriter.write()则具有以下逻辑来处理空键/值连接逻辑:

    public synchronized void write(K key, V value) throws IOException {

        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (nullKey && nullValue) {
            return;
        }

        if (!nullKey) {
            // if we've written data before, append a new line
            if (dataWritten) {
                out.write(newline);
            }

            // write out the key and separator
            writeObject(key);
            out.write(keyValueSeparator);
        } else if (!nullValue) {
            // write out the value delimiter
            out.write(valueDelimiter);
        }

        // write out the value
        writeObject(value);

        // track that we've written some data
        dataWritten = true;
    }

    public synchronized void close(Reporter reporter) throws IOException {
        // if we've written out any data, append a closing newline
        if (dataWritten) {
            out.write(newline);
        }

        out.close();
    }

你会注意到 close 方法也被修改为在写出的最后一条记录中写入一个尾随换行符

完整的代码清单可以在pastebin上找到,这里是测试输出:

key1    value1
key2    value1,value2,value3
key3    value1,value2
于 2012-04-14T02:21:50.670 回答
2

如果单个输出键值可以大于内存,则意味着标准输出机制不适合 - 因为通过接口设计,它需要传递键值对而不是流。
我认为最简单的解决方案是将输出直接流式传输到 HDFS 文件。
如果您有理由通过输出格式传递数据 - 我建议采用以下解决方案:a)写入本地临时目录
b)将文件名作为输出格式的值传递。

可能最有效但有点复杂的解决方案是将内存映射文件用作缓冲区。只要有足够的内存,它就会在内存中,并且在需要时操作系统会关心有效地溢出到磁盘。

于 2012-04-13T17:38:10.397 回答