0

我正在为减少子级获取 OOM 异常(Java 堆空间)。在 reducer 中,我将所有值附加到 StringBuilder,这将是 reducer 进程的输出。值的数量并不多。我试图将值mapred.reduce.child.java.opts增加到 512M 和 1024M 但这没有帮助。减速器代码如下。

            StringBuilder adjVertexStr = new StringBuilder();
        long itcount= 0;
        while(values.hasNext()) {
            adjVertexStr.append(values.next().toString()).append(" ");
            itcount++;
        }
        log.info("Size of iterator: " + itcount);
        multipleOutputs.getCollector("vertex", reporter).collect(key, new Text(""));
        multipleOutputs.getCollector("adjvertex", reporter).collect(adjVertexStr, new Text(""));

在上面的代码中,我在 3 个地方得到了异常。

  1. 在异常堆栈跟踪中,行号指向附加字符串的 while 循环语句。
  2. 在最后一行 - collect() 语句。
  3. 我有一个集合所有值 - 所以没有重复的值。我后来删除了它。

迭代器的一些样本大小如下:238695、1、13、673、1、1等。这些不是很大的值。为什么我不断收到 OOM 异常?任何帮助对我来说都是有价值的。

堆栈跟踪

2012-10-10 21:15:03,929 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 238695                                                                                                   
2012-10-10 21:15:04,190 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,190 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,190 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 13                                                                                                       
2012-10-10 21:15:04,190 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,191 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,193 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 673                                                                                                       
2012-10-10 21:15:04,195 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,196 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,196 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,196 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:04,196 INFO partitioning.UndirectedGraphPartitioner: Size of iterator: 1                                                                                                        
2012-10-10 21:15:09,856 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs`    truncater with mapRetainSize=-1 and reduceRetainSize=-1                                                       
2012-10-10 21:15:09,916 INFO org.apache.hadoop.io.nativeio.NativeIO: Initialized cache for UID to  User mapping with a cache timeout of 14400 seconds.                                                     
2012-10-10 21:15:09,916 INFO org.apache.hadoop.io.nativeio.NativeIO: Got UserName hduser for UID         2006 from the native implementation                                                                      
2012-10-10 21:15:09,922 FATAL org.apache.hadoop.mapred.Child: Error running child :       java.lang.OutOfMemoryError: Java heap space                                                                           
    at java.util.Arrays.copyOf(Arrays.java:2882)                                                                                                                                                      
    at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:100)                                                                                                                 
    at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:390)                                                                                                                         
    at java.lang.StringBuilder.append(StringBuilder.java:119)                                                                                                                                         
    at partitioning.UndirectedGraphPartitioner$Reduce.reduce(UndirectedGraphPartitioner.java:106)                                                                                            
    at partitioning.UndirectedGraphPartitioner$Reduce.reduce(UndirectedGraphPartitioner.java:82)                                                                                             
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519)                                                                                                                         
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)                                                                                                                                   
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)                                                                                                                                           
    at java.security.AccessController.doPrivileged(Native Method)                                                                                                                                     
    at javax.security.auth.Subject.doAs(Subject.java:396)                                                                                                                                             
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)                                                                                                           
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 
4

1 回答 1

2

因此,对于您的示例,您希望将特定键的值作为空格分隔的值列表(作为输出键)输出,并将空文本作为输出值。

您的输出格式将按如下方式使用 reduce 键/值(这将在您的 reducer 代码中):

for (Text value : values) {
    multipleOutputs.getCollector("adjvertex", reporter)
       .collect(key, value);
}

然后,实际的 recordWriter 将使用该键作为逻辑触发器:

当传递的密钥与先前传递的密钥不同时,先前写入的记录将被关闭(例如,写一个制表符后跟一个换行符)。将更新先前的键并将新值写入输出流。

如果键与前一个键相同,则将一个空格后跟值输出到输出流。

在记录写入器的关闭方法中,执行与传递新键相同的逻辑(输出一个制表符,后跟一个换行符)。

希望这是有道理的。您唯一需要注意的是您是否有自定义组比较器(这将导致记录写入器中的先前键比较失败)。还记得在更新之前的密钥跟踪变量时制作密钥的深层副本。

于 2012-10-11T03:07:00.823 回答