3

背景:

我们有一个项目使用 spark 处理一些 log/csv 文件,每个文件的大小都非常大,例如 20GB。

所以我们需要压缩log/csv文件

例子

HDFS 块大小:128M,我们有一个 1GB 的日志文件。

如果文件未压缩,HDFS 中将有 8 个块

var rddFlat = sc.textFile("hdfs:///tmp/test.log")

rddFlat.partition.length 将为 8(因为将有 8 个输入拆分)

如果使用bzip2,假设压缩后压缩后大小为256MB(实际上bz压缩率很高),会有2个块

var rddCompress = sc.textFile("hdfs:///tmp/test.log.bz2")

rddCompress.partition.length 将为 2(对吗?)

如果我们有以下变换和动作

var cnFlat = rddFlat.map(x => x.contains("error")).count();
var cnCompress = rddCompress.map(x => x.contains("error")).count();

我的疑惑

(压缩文件的HDFS块、输入分割和spark分区的关系,可分割和不可分割压缩)

  1. spark如何处理压缩分区?

    spark的每个执行者将他们分配的分区解压缩到spark块中并对块进行转换和操作?

  2. 如果我们删除解压缩时间,哪个更慢?

    cnCompress 计算会更慢吗?因为只有 2 个分区,只有两个节点会进行转换和操作。cnFlat 有 8 个分区。

  3. 在选择压缩编解码器(可拆分或不可拆分)时,我们是否需要考虑压缩后的大小?

    压缩后,如果压缩后的大小小于或等于 HDFS 块大小。在splittable方面,我们选择splittable还是不splittable压缩编解码器没有意义,因为spark RDD只会有一个分区(我的意思是只有一个worker会处理rdd)?

4

0 回答 0