我正在努力理解 mapreduce 中的数据流。最近,当我的磁盘在减少阶段内存不足时,一项要求非常高的工作崩溃了。我发现很难估计我的工作需要多少磁盘。我将详细描述数据流。
如果有人可以更正、详细说明 mapreduce 中的数据流或就我的系统尺寸提供建议,那将很有帮助。
集群配置:
我有一个包含30 个从站的集群
- 12 GB 内存
- 100 GB 硬盘
- 4核
我的地图任务与 wordcount 非常相似,因此它们需要的内存很少。我的reduce 任务适用于单词的排列组。由于需要加入相同的单词,reduce 函数需要一个始终 <= 3GB 的临时哈希映射。
由于我有 12GB 的 RAM 并且我的 hadoop 守护进程需要 1GB 堆 + 500MB 用于操作系统,我将 map/reduce 插槽划分如下:
4 个 900MB 堆的 map 槽和2 个 3GB 堆的 reduce 槽。由于映射槽不需要超过 300MB 的内存,因此我将其设置io.sort.mb
为 500 MB 以改进映射阶段的内存排序。
我的工作有1800 个地图任务,每个任务生成 8 GB 的地图输出。由于我使用 BZIP2 进行压缩,因此可以压缩到 1 GB。这意味着当我有 3 TB 的内存时,总地图输出将低于 2 TB 。
我选择了 100 个 reduce 任务,每个任务产生 5 GB 的输出。
乍一看,一切都应该适合记忆。但显然排序阶段需要压缩和解压缩,而复制阶段需要数据同时在 2 个地方(我假设)。所以这就是它变得棘手的地方,这就是为什么我想完全理解数据流。这是我认为它的工作原理,但如果我错了,请纠正我:
数据流
映射任务会生成许多溢出(在我的情况下为 200 个),它们在内存中排序,然后在写入本地磁盘之前进行压缩。一旦地图任务完成,这会给我 200 个溢出文件,每 10 个 ( )合并io.sort.factor
。这意味着解压缩了 10 个文件:10 x (5MB -> 40MB),因此这会产生 0.4GB 的压缩/解压缩开销. 虽然我不确定 200 次泄漏第一次合并后会发生什么。我想他们在每个减少任务中首先被洗牌?所以文件的大小不会真正增加太多。如果我们从黑盒的角度来看,这意味着我们从 200 个压缩溢出开始,最终得到 100 个用于 reduce 任务的压缩文件(每个任务 1 个)。
由于我只有 60 个减速器,现在每个节点60 个压缩文件被复制到减速器,这已经在映射阶段完成。这可能意味着压缩文件临时存在于源和目标上。这意味着在这种情况下,每个节点的内存需求(暂时)上升 160 个压缩文件,是映射输出的 1.6 倍。地图输出为 1800 GB,所以我们最终得到 2880 GB,尽管是暂时的。所以第一个reduce阶段应该能够开始并且确实如此。复制后(我希望!)数据从映射器本地输出目录中删除,所以我们有与地图输出相同的数据量,再次为 1800 GB。
现在 reducer 中的排序阶段开始了。我希望它在映射器的内存被清除之前不要启动?!由于要合并1800个map任务的输出,所以必须解压。reduce 任务的输入大约是 mapoutput / 100 = 18 GB 的压缩数据。现在这是如何解压的,它不能一次全部解压,因为那时我每个节点会有 144GB,而且由于我的工作没有崩溃,所以解压的执行会稍微智能一些。我会以与映射阶段相同的方式思考:同时解压缩和合并 10 个文件(1800 个任务输出)。然后,解压缩将在每个合并轮中产生 18GB/180 = 100 MB 的开销。问题再次是最后几轮合并是如何发生的,我记得在 hadoop 参考中读到,reducers 不会继续合并,直到只剩下一个文件。
在 reduce 阶段排序之后,reduce 阶段运行,这需要对输入记录进行解压缩,但由于每个 reduce 任务都使用 500 个输入键组,这应该不是一个真正的问题。
如前所述,reduce 任务向 DFS 生成大约 5GB 的输出(总共 0.5TB)。
在前 60 个 reduce 任务完成后,这项工作真的会遇到麻烦。在第二轮中,任务在排序阶段开始崩溃,这让我认为这与复制开销或解压缩开销有关。
确切的例外是:org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3
我希望我足够详细地解释了我的程序流程以及我对 mapreduce 的理解。如果出现以下情况,我将非常感激:
- 有人可以清除有关复制阶段和合并阶段的烟雾
- 以及提供克服工作崩溃的建议。
- 能够准确估计我需要多少内存对我来说是理想的,因为如果我尝试一个有 40 个节点的集群在运行 5 天后崩溃(就像这次所经历的那样),那将是不愉快的,因为截止日期是越来越接近。
提前致谢
我的工作失败的堆栈跟踪在这里:
例外一:
org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:510)
at org.apache.hadoop.mapred.Merger.merge(Merger.java:142)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2539)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.access$400(ReduceTask.java:661)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:399)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
例外 2:
FAILEDjava.io.IOException: Task: attempt_201310160819_0001_r_000075_1 - The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/map_1622.out
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
at org.apache.hadoop.mapred.MapOutputFile.getInputFileForWrite(MapOutputFile.java:176)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2798)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2762)
异常3:(可能是diskchecker异常引起的)
Task attempt_201310160819_0001_r_000077_1 failed to report status for 2400 seconds. Killing!