0

我正在努力理解 mapreduce 中的数据流。最近,当我的磁盘在减少阶段内存不足时,一项要求非常高的工作崩溃了。我发现很难估计我的工作需要多少磁盘。我将详细描述数据流。

如果有人可以更正、详细说明 mapreduce 中的数据流或就我的系统尺寸提供建议,那将很有帮助。

集群配置:

我有一个包含30 个从站的集群

  • 12 GB 内存
  • 100 GB 硬盘
  • 4核

我的地图任务与 wordcount 非常相似,因此它们需要的内存很少。我的reduce 任务适用于单词的排列组。由于需要加入相同的单词,reduce 函数需要一个始终 <= 3GB 的临时哈希映射。

由于我有 12GB 的 RAM 并且我的 hadoop 守护进程需要 1GB 堆 + 500MB 用于操作系统,我将 map/reduce 插槽划分如下:

4 个 900MB 堆的 map 槽2 个 3GB 堆的 reduce 槽。由于映射槽不需要超过 300MB 的内存,因此我将其设置io.sort.mb为 500 MB 以改进映射阶段的内存排序。

我的工作有1800 个地图任务,每个任务生成 8 GB 的地图输出。由于我使用 BZIP2 进行压缩,因此可以压缩到 1 GB。这意味着当我有 3 TB 的内存时,总地图输出将低于 2 TB 。

我选择了 100 个 reduce 任务,每个任务产生 5 GB 的输出。

乍一看,一切都应该适合记忆。但显然排序阶段需要压缩和解压缩,而复制阶段需要数据同时在 2 个地方(我假设)。所以这就是它变得棘手的地方,这就是为什么我想完全理解数据流。这是我认为它的工作原理,但如果我错了,请纠正我:

数据流

映射任务会生成许多溢出(在我的情况下为 200 个),它们在内存中排序,然后在写入本地磁盘之前进行压缩。一旦地图任务完成,这会给我 200 个溢出文件,每 10 个 ( )合并io.sort.factor。这意味着解压缩了 10 个文件:10 x (5MB -> 40MB),因此这会产生 0.4GB 的压缩/解压缩开销. 虽然我不确定 200 次泄漏第一次合并后会发生什么。我想他们在每个减少任务中首先被洗牌?所以文件的大小不会真正增加太多。如果我们从黑盒的角度来看,这意味着我们从 200 个压缩溢出开始,最终得到 100 个用于 reduce 任务的压缩文件(每个任务 1 个)。

由于我只有 60 个减速器,现在每个节点60 个压缩文件被复制到减速器,这已经在映射阶段完成。这可能意味着压缩文件临时存在于源和目标上。这意味着在这种情况下,每个节点的内存需求(暂时)上升 160 个压缩文件,是映射输出的 1.6 倍。地图输出为 1800 GB,所以我们最终得到 2880 GB,尽管是暂时的。所以第一个reduce阶段应该能够开始并且确实如此。复制后(我希望!)数据从映射器本地输出目录中删除,所以我们有与地图输出相同的数据量,再次为 1800 GB。

现在 reducer 中的排序阶段开始了。我希望它在映射器的内存被清除之前不要启动?!由于要合并1800个map任务的输出,所以必须解压。reduce 任务的输入大约是 mapoutput / 100 = 18 GB 的压缩数据。现在这是如何解压的,它不能一次全部解压,因为那时我每个节点会有 144GB,而且由于我的工作没有崩溃,所以解压的执行会稍微智能一些。我会以与映射阶段相同的方式思考:同时解压缩和合并 10 个文件(1800 个任务输出)。然后,解压缩将在每个合并轮中产生 18GB/180 = 100 MB 的开销。问题再次是最后几轮合并是如何发生的,我记得在 hadoop 参考中读到,reducers 不会继续合并,直到只剩下一个文件。

在 reduce 阶段排序之后,reduce 阶段运行,这需要对输入记录进行解压缩,但由于每个 reduce 任务都使用 500 个输入键组,这应该不是一个真正的问题。

如前所述,reduce 任务向 DFS 生成大约 5GB 的输出(总共 0.5TB)。

在前 60 个 reduce 任务完成后,这项工作真的会遇到麻烦。在第二轮中,任务在排序阶段开始崩溃,这让我认为这与复制开销或解压缩开销有关。

确切的例外是:org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3

我希望我足够详细地解释了我的程序流程以及我对 mapreduce 的理解。如果出现以下情况,我将非常感激:

  • 有人可以清除有关复制阶段和合并阶段的烟雾
  • 以及提供克服工作崩溃的建议。
  • 能够准确估计我需要多少内存对我来说是理想的,因为如果我尝试一个有 40 个节点的集群在运行 5 天后崩溃(就像这次所经历的那样),那将是不愉快的,因为截止日期是越来越接近。

提前致谢

我的工作失败的堆栈跟踪在这里:

例外一:

org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:510)
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:142)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2539)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.access$400(ReduceTask.java:661)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:399)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

例外 2:

FAILEDjava.io.IOException: Task: attempt_201310160819_0001_r_000075_1 - The reduce copier failed
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/map_1622.out
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
    at org.apache.hadoop.mapred.MapOutputFile.getInputFileForWrite(MapOutputFile.java:176)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2798)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2762)

异常3:(可能是diskchecker异常引起的)

Task attempt_201310160819_0001_r_000077_1 failed to report status for 2400 seconds. Killing!
4

1 回答 1

0

我刚刚收到来自 Praveen Sripati 的电子邮件,其中提到了 hadoop 参考,我将其粘贴在这里:

在复制阶段,数据是否同时存在于 map 和 reduce 任务中?地图输出什么时候清零?

以下来自 Hadoop - The Definitive Guide

主机不会在第一个 reducer 检索到 map 输出后立即从磁盘中删除它们,因为 reducer 可能随后会失败。相反,他们会等到作业跟踪器(或应用程序主控器)告诉他们删除它们,这是在作业完成之后。

这很重要,地图输出保留在磁盘上!!在我的情况下有点不幸。

5)然后减速器中的合并开始。不完全确定它是如何完成的。每个reduce键是否合并到一个文件?还是将所有内容合并为一个任务?

又出自同一本书

当所有 map 输出都被复制后,reduce 任务进入排序阶段(应该正确地称为合并阶段,因为排序是在 map 端进行的),它合并 map 输出,保持它们的排序顺序。这是轮流进行的。例如,如果有 50 个地图输出并且合并因子为 10(默认值,由 io.sort.factor 属性控制,就像在地图的合并中一样),则将有五轮。每轮将 10 个文件合并为一个,所以最后会有 5 个中间文件。

谢谢, 普拉文

这意味着合并后的文件数限制为 io.sort.factor。就我而言,有 10 个段,每个段 1.8GB。在最后一次合并期间,所有内容都必须解压缩,因此每轮需要 1.8 * 10 GB = 18 GB。

于 2013-10-21T13:44:37.200 回答