背景:
我们有一个项目使用 spark 处理一些 log/csv 文件,每个文件的大小都非常大,例如 20GB。
所以我们需要压缩log/csv文件
例子
HDFS 块大小:128M,我们有一个 1GB 的日志文件。
如果文件未压缩,HDFS 中将有 8 个块
var rddFlat = sc.textFile("hdfs:///tmp/test.log")
rddFlat.partition.length 将为 8(因为将有 8 个输入拆分)
如果使用bzip2,假设压缩后压缩后大小为256MB(实际上bz压缩率很高),会有2个块
var rddCompress = sc.textFile("hdfs:///tmp/test.log.bz2")
rddCompress.partition.length 将为 2(对吗?)
如果我们有以下变换和动作
var cnFlat = rddFlat.map(x => x.contains("error")).count();
var cnCompress = rddCompress.map(x => x.contains("error")).count();
我的疑惑
(压缩文件的HDFS块、输入分割和spark分区的关系,可分割和不可分割压缩)
spark如何处理压缩分区?
spark的每个执行者将他们分配的分区解压缩到spark块中并对块进行转换和操作?
如果我们删除解压缩时间,哪个更慢?
cnCompress 计算会更慢吗?因为只有 2 个分区,只有两个节点会进行转换和操作。cnFlat 有 8 个分区。
在选择压缩编解码器(可拆分或不可拆分)时,我们是否需要考虑压缩后的大小?
压缩后,如果压缩后的大小小于或等于 HDFS 块大小。在splittable方面,我们选择splittable还是不splittable压缩编解码器没有意义,因为spark RDD只会有一个分区(我的意思是只有一个worker会处理rdd)?