123

根据Hadoop - The Definitive Guide

FileInputFormats 定义的逻辑记录通常不适合 HDFS 块。例如,TextInputFormat 的逻辑记录是行,通常会跨越 HDFS 边界。这与您的程序的功能无关——例如,行不会丢失或损坏——但值得了解,因为它确实意味着数据本地映射(即,与它们在同一主机上运行的映射)输入数据)将执行一些远程读取。这导致的轻微开销通常并不显着。

假设一条记录线被分成两个块(b1 和 b2)。处理第一个块 (b1) 的映射器将注意到最后一行没有 EOL 分隔符,并从下一个数据块 (b2) 中获取该行的剩余部分。

处理第二个块(b2)的映射器如何确定第一个记录不完整,应该从块(b2)中的第二个记录开始处理?

4

6 回答 6

163

有趣的问题,我花了一些时间查看代码的细节,这是我的想法。拆分由客户端 by 处理InputFormat.getSplits,因此查看 FileInputFormat 会提供以下信息:

  • 对于每个输入文件,获取文件长度、块大小并计算分割大小,其中max(minSize, min(maxSize, blockSize))对应maxSizemapred.max.split.size和。minSizemapred.min.split.size
  • FileSplit根据上面计算的分割大小将文件分成不同的s。这里重要的是每个都使用与输入文件中的偏移量相对应FileSplit的参数进行初始化start。那时仍然没有处理线。代码的相关部分如下所示:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

之后,如果您查看LineRecordReader由 定义的TextInputFormat,那是处理行的地方:

  • 当您初始化时LineRecordReader,它会尝试实例化 a LineReader,这是一种能够读取行的抽象FSDataInputStream。有2种情况:
  • 如果有CompressionCodec定义,那么这个编解码器负责处理边界。可能与您的问题无关。
  • 但是,如果没有编解码器,这就是有趣的地方:如果startInputSplit的 不同于 0,那么您回溯 1 个字符,然后跳过您遇到的由 \n 或 \r\n (Windows) 标识的第一行!回溯很重要,因为如果您的行边界与分割边界相同,这可以确保您不会跳过有效行。以下是相关代码:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

因此,由于拆分是在客户端计算的,因此映射器不需要按顺序运行,每个映射器都已经知道它是否需要丢弃第一行。

因此,基本上,如果您在同一个文件中有 2 行,每个 100Mb,为了简化,假设拆分大小为 64Mb。然后在计算输入拆分时,我们将有以下场景:

  • 拆分 1 包含此块的路径和主机。在开始时初始化 200-200=0Mb,长度 64Mb。
  • 拆分 2 在开始时初始化 200-200+64=64Mb,长度 64Mb。
  • 拆分 3 在开始时初始化 200-200+128=128Mb,长度 64Mb。
  • 拆分 4 在开始时初始化 200-200+192=192Mb,长度 8Mb。
  • 映射器 A 将处理拆分 1,开始为 0,因此不要跳过第一行,并读取超过 64Mb 限制的整行,因此需要远程读取。
  • Mapper B 将处理 split 2,start 是 != 0 所以跳过 64Mb-1byte 之后的第一行,这对应于第 1 行的末尾 100Mb 仍然在 split 2 中,我们在 split 2 中有 28Mb 的行,所以远程读取剩余的 72Mb。
  • Mapper C 将处理拆分 3,开始为 != 0 所以跳过 128Mb-1byte 之后的第一行,这对应于 200Mb 处的第 2 行的结尾,这是文件的结尾,所以不要做任何事情。
  • 映射器 D 与映射器 C 相同,只是它在 192Mb-1byte 之后查找换行符。
于 2013-01-26T18:44:05.087 回答
18

Map Reduce算法不适用于文件的物理块。它适用于逻辑输入拆分。输入拆分取决于记录的写入位置。一条记录可能跨越两个映射器。

HDFS的设置方式是将非常大的文件分解为大块(例如,测量 128MB),并将这些块的三个副本存储在集群的不同节点上。

HDFS 不知道这些文件的内容。一条记录可能已在Block-a中开始,但该记录的结尾可能出现在Block-b中。

为了解决这个问题,Hadoop 使用存储在文件块中的数据的逻辑表示,称为输入拆分。当 MapReduce 作业客户端计算输入拆分时,它会计算出块中第一条完整记录的开始位置和块中最后一条记录的结束位置

关键点:

如果块中的最后一条记录不完整,则输入拆分包括下一个块的位置信息和完成记录所需的数据的字节偏移量。

看看下面的图表。

在此处输入图像描述

看看这篇文章和相关的 SE 问题:关于 Hadoop/HDFS 文件拆分

可以从文档中阅读更多详细信息

Map-Reduce 框架依赖作业的 InputFormat 来:

  1. 验证作业的输入规范。
  2. 将输入文件拆分为逻辑 InputSplits,然后将每个输入文件分配给单独的 Mapper。
  3. 然后将每个 InputSplit 分配给一个单独的 Mapper 进行处理。Split 可以是 tupleInputSplit[] getSplits(JobConf job,int numSplits) 是处理这些事情的 API。

FileInputFormat,它扩展了InputFormat实现的getSplits() 方法。在grepcode中查看此方法的内部结构

于 2016-01-12T06:39:46.950 回答
7

我认为它如下: InputFormat 负责将数据拆分为逻辑拆分,同时考虑到数据的性质。
没有什么能阻止它这样做,尽管它会为作业增加显着的延迟 - 所有逻辑和围绕所需拆分大小边界的读取都将在 jobtracker 中发生。
最简单的记录感知输入格式是 TextInputFormat。它的工作方式如下(据我从代码中了解)-输入格式按大小创建拆分,无论行如何,但 LineRecordReader 始终:
a)跳过拆分中的第一行(或其中的一部分),如果不是第一次拆分
b) 最后读取拆分边界后的一行(如果数据可用,则不是最后一次拆分)。

于 2013-01-12T09:49:55.030 回答
3

据我了解,当FileSplit为第一个块初始化时,会调用默认构造函数。因此 start 和 length 的值最初为零。在第一个块的处理结束时,如果最后一行不完整,那么长度的值将大于分割的长度,它也会读取下一个块的第一行。因此,第一个块的 start 值将大于零,在这种情况下,LineRecordReader将跳过第二个块的第一行。(见来源

如果第一个块的最后一行是完整的,那么 length 的值将等于第一个块的长度,而第二个块的 start 值将为零。在这种情况下,LineRecordReader将不会跳过第一行并从头开始读取第二个块。

说得通?

于 2013-01-23T13:29:48.297 回答
1

从 LineRecordReader.java 的 hadoop 源代码构造函数:我找到了一些评论:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

由此我相信hadoop会为每个拆分读取一个额外的行(在当前拆分结束时,在下一个拆分中读取下一行),如果不是第一次拆分,第一行将被丢弃。以免线路记录丢失和不完整

于 2015-01-27T16:18:47.033 回答
0

映射器不必通信。文件块在 HDFS 中,并且当前映射器(RecordReader)可以读取具有该行剩余部分的块。这发生在幕后。

于 2014-04-06T00:01:42.063 回答