所以人们在压缩 Scalding Jobs 的输出时遇到了问题,包括我自己。谷歌搜索后,我在某个不起眼的论坛上得到了奇怪的答案,但没有适合人们复制和粘贴需求的东西。
我想要一个类似的输出Tsv
,但会写入压缩输出。
所以人们在压缩 Scalding Jobs 的输出时遇到了问题,包括我自己。谷歌搜索后,我在某个不起眼的论坛上得到了奇怪的答案,但没有适合人们复制和粘贴需求的东西。
我想要一个类似的输出Tsv
,但会写入压缩输出。
无论如何,经过大量的修改后,我设法编写了一个似乎可以完成工作的 TsvCompressed 输出(您仍然需要设置 hadoop 作业系统配置属性,即将压缩设置为 true,并将编解码器设置为合理的值,或者默认为糟糕的放气)
import com.twitter.scalding._
import cascading.tuple.Fields
import cascading.scheme.local
import cascading.scheme.hadoop.{TextLine, TextDelimited}
import cascading.scheme.Scheme
import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}
case class TsvCompressed(p: String) extends FixedPathSource(p) with DelimitedSchemeCompressed
trait DelimitedSchemeCompressed extends Source {
val types: Array[Class[_]] = null
override def localScheme = new local.TextDelimited(Fields.ALL, false, false, "\t", types)
override def hdfsScheme = {
val temp = new TextDelimited(Fields.ALL, false, false, "\t", types)
temp.setSinkCompression(TextLine.Compress.ENABLE)
temp.asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}
}
我还有一个小项目,展示如何从Tsv
. 字数压缩。
Scalding 设置null
为禁用压缩的 CascadingTextDelimeted
参数。