22

与我的另一个问题相关,但不同:

someMap.saveAsTextFile("hdfs://HOST:PORT/out")

如果我将 RDD 保存到 HDFS,我如何告诉 spark 用 gzip 压缩输出?在 Hadoop 中,可以设置

mapred.output.compress = true

并选择压缩算法

mapred.output.compression.codec = <<classname of compression codec>>

我将如何在火花中做到这一点?这也会起作用吗?

编辑:使用 spark-0.7.2

4

4 回答 4

21

该方法saveAsTextFile需要使用编解码器类的附加可选参数。因此,对于您的示例,使用 gzip 应该是这样的:

someMap.saveAsTextFile("hdfs://HOST:PORT/out", classOf[GzipCodec])

更新

由于您使用的是 0.7.2,因此您可以通过在启动时设置的配置选项来移植压缩代码。我不确定这是否会完全有效,但你需要从这个开始:

conf.setCompressMapOutput(true)
conf.set("mapred.output.compress", "true")
conf.setMapOutputCompressorClass(c)
conf.set("mapred.output.compression.codec", c.getCanonicalName)
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)

像这样:

System.setProperty("spark.hadoop.mapred.output.compress", "true")
System.setProperty("spark.hadoop.mapred.output.compression.codec", "true")
System.setProperty("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
System.setProperty("spark.hadoop.mapred.output.compression.type", "BLOCK")

如果你让它工作,发布你的配置可能也会对其他人有所帮助。

于 2013-06-21T17:42:05.083 回答
2

将 gzip 文件保存到 HDFS 或 Amazon S3 目录系统的另一种方法是使用saveAsHadoopFile方法。

someMap 是 RDD[(K,V)],如果你有 someMap 作为 RDD[V],你可以调用 someMap.map(line=>(line, "") 来使用 saveAsHadoopFile 方法。

import org.apache.hadoop.io.compress.GzipCodec

someMap.saveAsHadoopFile(output_folder_path, classOf[String], classOf[String], classOf[MultipleTextOutputFormat[String, String]], classOf[GzipCodec])
于 2015-05-04T20:59:41.613 回答
1

对于较新的 Spark 版本,请在 spark-defaults.xml 文件中执行以下操作。(mapred已弃用)。

<property>
    <name>mapreduce.output.fileoutputformat.compress</name>
    <value>true</value>
</property>
<property>
    <name>mapreduce.output.fileoutputformat.compress.codec</name>
    <value>GzipCodec</value>
</property>
<property>
    <name>mapreduce.output.fileoutputformat.compress.type</name>
    <value>BLOCK</value>
</property>
于 2016-08-18T21:21:37.257 回答
1

这是对所有大多数版本的 spark 快速进行压缩的最简单/最短的方法。

import org.apache.hadoop.io.SequenceFile.CompressionType

 /**
   * Set compression configurations to Hadoop `Configuration`.
   * `codec` should be a full class path
   */
  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
    if (codec != null) {
      conf.set("mapreduce.output.fileoutputformat.compress", "true")
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) // "BLOCK" as string
      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
      conf.set("mapreduce.map.output.compress", "true")
      conf.set("mapreduce.map.output.compress.codec", codec)
    } else {
      // This infers the option `compression` is set to `uncompressed` or `none`.
      conf.set("mapreduce.output.fileoutputformat.compress", "false")
      conf.set("mapreduce.map.output.compress", "false")
    }
  }

conf在哪里spark.sparkContext.hadoopConfiguration

codec上述方法中的字符串参数选项为

 1.none 
 2.uncompressed 
 3.bzip2 
 4.deflate 
 5.gzip 
 6.lz4 
 7.snappy
于 2020-05-12T15:48:38.260 回答