2

我在hadoop map reduce中写了一个简单的hash join程序。思路如下:

使用 hadoop 框架提供的 DistributedCache 将一个小表分发给每个映射器。大表分布在映射器上,拆分大小为 64M。映射器的设置代码创建一个从这个小表中读取每一行的哈希图。在映射器代码中,在哈希映射中搜索(获取)每个键,如果该键存在于哈希映射中,则将其写出。此时不需要减速器。这是我们使用的代码:

    public class Map extends Mapper<LongWritable, Text, Text, Text> {
        private HashMap<String, String> joinData = new HashMap<String, String>();

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String textvalue = value.toString();
            String[] tokens;
            tokens = textvalue.split(",");
            if (tokens.length == 2) {
                String joinValue = joinData.get(tokens[0]);
                if (null != joinValue) {
                    context.write(new Text(tokens[0]), new Text(tokens[1] + ","
                            + joinValue));
                }
            }
        }

    public void setup(Context context) {
        try {
            Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context
                    .getConfiguration());
            if (null != cacheFiles && cacheFiles.length > 0) {
                String line;
                String[] tokens;
                BufferedReader br = new BufferedReader(new FileReader(
                        cacheFiles[0].toString()));
                try {
                    while ((line = br.readLine()) != null) {

                        tokens = line.split(",");
                        if (tokens.length == 2) {
                            joinData.put(tokens[0], tokens[1]);
                        }
                    }
                    System.exit(0);
                } finally {
                    br.close();
                }
            }

        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

测试这段代码时,我们的小表是32M,大表是128M,1个主节点,2个从节点。

当我有 256M 的堆时,此代码因上述输入而失败。我在 mapred-site.xml 文件的 mapred.child.java.opts 中使用 -Xmx256m。当我将其增加到 300m 时,它会非常缓慢地进行,而在 512m 时,它会达到其最大吞吐量。

我不明白我的映射器在哪里消耗了这么多内存。使用上面给出的输入和映射器代码,我不希望我的堆内存达到 256M,但它会因 java 堆空间错误而失败。

如果您能对映射器消耗如此多内存的原因提供一些见解,我将不胜感激。

编辑

13/03/11 09:37:33 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/03/11 09:37:33 INFO input.FileInputFormat: Total input paths to process : 1
13/03/11 09:37:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/03/11 09:37:33 WARN snappy.LoadSnappy: Snappy native library not loaded
13/03/11 09:37:34 INFO mapred.JobClient: Running job: job_201303110921_0004
13/03/11 09:37:35 INFO mapred.JobClient:  map 0% reduce 0%
13/03/11 09:39:12 INFO mapred.JobClient: Task Id : attempt_201303110921_0004_m_000000_0, Status : FAILED
Error: GC overhead limit exceeded
13/03/11 09:40:43 INFO mapred.JobClient: Task Id : attempt_201303110921_0004_m_000001_0, Status : FAILED
org.apache.hadoop.io.SecureIOUtils$AlreadyExistsException: File /usr/home/hadoop/hadoop-1.0.3/libexec/../logs/userlogs/job_201303110921_0004/attempt_201303110921_0004_m_000001_0/log.tmp already exists
    at org.apache.hadoop.io.SecureIOUtils.insecureCreateForWrite(SecureIOUtils.java:130)
    at org.apache.hadoop.io.SecureIOUtils.createForWrite(SecureIOUtils.java:157)
    at org.apache.hadoop.mapred.TaskLog.writeToIndexFile(TaskLog.java:312)
    at org.apache.hadoop.mapred.TaskLog.syncLogs(TaskLog.java:385)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:257)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

attempt_201303110921_0004_m_000001_0: Exception in thread "Thread for syncLogs" java.lang.OutOfMemoryError: Java heap space
attempt_201303110921_0004_m_000001_0:   at java.io.BufferedOutputStream.<init>(BufferedOutputStream.java:76)
attempt_201303110921_0004_m_000001_0:   at java.io.BufferedOutputStream.<init>(BufferedOutputStream.java:59)
attempt_201303110921_0004_m_000001_0:   at org.apache.hadoop.mapred.TaskLog.writeToIndexFile(TaskLog.java:312)
attempt_201303110921_0004_m_000001_0:   at org.apache.hadoop.mapred.TaskLog.syncLogs(TaskLog.java:385)
attempt_201303110921_0004_m_000001_0:   at org.apache.hadoop.mapred.Child$3.run(Child.java:141)
attempt_201303110921_0004_m_000001_0: log4j:WARN No appenders could be found for logger (org.apache.hadoop.hdfs.DFSClient).
attempt_201303110921_0004_m_000001_0: log4j:WARN Please initialize the log4j system properly.
13/03/11 09:42:18 INFO mapred.JobClient: Task Id : attempt_201303110921_0004_m_000001_1, Status : FAILED
Error: GC overhead limit exceeded
13/03/11 09:43:48 INFO mapred.JobClient: Task Id : attempt_201303110921_0004_m_000001_2, Status : FAILED
Error: GC overhead limit exceeded
13/03/11 09:45:09 INFO mapred.JobClient: Job complete: job_201303110921_0004
13/03/11 09:45:09 INFO mapred.JobClient: Counters: 7
13/03/11 09:45:09 INFO mapred.JobClient:   Job Counters 
13/03/11 09:45:09 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=468506
13/03/11 09:45:09 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/03/11 09:45:09 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/03/11 09:45:09 INFO mapred.JobClient:     Launched map tasks=6
13/03/11 09:45:09 INFO mapred.JobClient:     Data-local map tasks=6
13/03/11 09:45:09 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
13/03/11 09:45:09 INFO mapred.JobClient:     Failed map tasks=1
4

1 回答 1

4

很难确定内存消耗的去向,但这里有一些指针:

  • 您正在Text为输入的每一行创建 2 个对象。您应该只使用 2 个Text将在您的类变量中初始化一次的对象,Mapper然后为每一行调用text.set(...). 这是 Map/Reduce 模式的常见使用模式,可以节省相当多的内存开销。
  • 您应该考虑SequenceFile对输入使用格式,这样可以避免使用 解析行textValue.split,而是将这些数据直接作为数组提供。我已经读过好几次这样的字符串拆分可能会非常密集,所以如果内存确实是一个问题,你应该尽可能避免。您也可以考虑使用KeyValueTextInputFormatif,就像在您的示例中一样,您只关心键/值对。

如果这还不够,我建议您查看此链接,尤其是第 7 部分,它为您提供了一种非常简单的方法来分析您的应用程序并查看在哪里分配了哪些内容。

于 2013-03-09T23:40:39.207 回答