在我的 MR 工作中,假设我指定了 map 或 reduce 输出到 LZO 的压缩,它是如何压缩的?是从 map 或 reduce 任务中获得的全部数据首先没有压缩,然后最后,未压缩的数据被压缩,还是被增量压缩和写入。如果它被增量压缩和写入,那么它是如何完成的?请帮助我理解这一点。
谢谢,
文卡特
在我的 MR 工作中,假设我指定了 map 或 reduce 输出到 LZO 的压缩,它是如何压缩的?是从 map 或 reduce 任务中获得的全部数据首先没有压缩,然后最后,未压缩的数据被压缩,还是被增量压缩和写入。如果它被增量压缩和写入,那么它是如何完成的?请帮助我理解这一点。
谢谢,
文卡特
我想我会通过解释压缩在更高级别上适合 mapreduce 管道的位置来为 Tariq 的答案添加更多细节。希望它是有帮助的。
如果您为映射阶段(mapreduce.map.output.compress=true
)指定压缩,则中间映射输出数据将使用您指定的任何编解码器(mapreduce.map.ouput.compress.codec=org.apache.hadoop.io.compress.*
)进行压缩,并在每个映射任务完成时(或者如果映射任务超出其序列化缓冲区,则更早)保存到磁盘限制,并开始溢出到磁盘)。然后在 mapreduce 作业的 Shuffle & Sort 阶段从磁盘读取压缩数据并发送到适当的节点。
在这个阶段(地图输出),压缩结果在可拆分方面没有任何好处,因此 GZIP 或 Snappy 编解码器以及 LZO 和 BZIP2 都值得尝试。GZIP 通常对大多数数据具有更好的压缩比,但会大量消耗 CPU,而 Snappy 速度更快但压缩比较低(即它要么具有较少的延迟,要么不像 GZIP 那样消耗大量 CPU……我不赞成原因)。使用 teragen 生成的数据,GZIP 与 Snappy 的压缩比分别为 3.5 倍和 2.5 倍。显然,您的数据和硬件限制将决定在您的情况下最有利的编解码器是什么。
shuffle & sort 阶段之前的压缩有助于减少磁盘 IO,并减少网络带宽,因为您通过网络发送压缩的数据。我想不出在这个阶段不压缩数据的充分理由,只要这样做的 CPU 资源不被争用。在我运行在 1 Gb 网络上的 10 节点小型 Hadoop 集群中,仅为映射输出阶段开启压缩(即在 shuffle & sort 阶段压缩之前的中间映射数据;最终输出未压缩)提高了整体工作时间41% (GZIP)和45% (Snappy)的 100GB terasort 作业与不使用压缩相比。这些实验中的数据是使用 teragen 生成的。当然,您的结果会因您的数据集、硬件和网络而异。
然后在缩减阶段开始时对压缩数据进行解压缩。
压缩在最终输出的缩减阶段结束时再次发挥作用(mapreduce.output.fileoutputformat.compress=true)。如果您将输出馈送到另一个 mapreduce 作业,这里是可拆分 LZO 或 BZIP2 压缩可能有用的地方。如果您不在输出上使用可拆分压缩编解码器并在该数据上运行作业,则只能使用单个映射器,这会破坏 Hadoop 的主要优势之一;并行化。解决此问题并使用 GZIP 编解码器之类的一种方法是为输出创建一个序列文件。序列文件是可拆分的,因为它本质上是一系列附加在一起的压缩文件。序列文件在每个文件附加到另一个文件的边界处是可拆分的。
它基本上取决于您使用的文件类型。如果它是文本文件,则压缩发生在文件级别。但如果是 SequenceFile,那么压缩可以是记录级别或块级别。请注意,这里的块是指使用序列文件的缓冲区,而不是 hdfs 块。
如果是块压缩,则一次将多条记录压缩到一个块中。记录被添加到一个块中,直到它达到以字节为单位的最小大小。通过从缓冲区大小中减去压缩算法的最大开销来计算一次要压缩的输入数据的最大大小。默认缓冲区大小为 512 字节,对于 zlib 算法,压缩开销为 18 字节(bufferSize 的 1% + 12 字节)。然后使用给定的输出流和压缩器创建 BlockCompressorStream,并写入压缩数据。
希望这在一定程度上回答了这个问题。