7

我有一个用 bzip2 压缩的维基百科转储(从http://dumps.wikimedia.org/enwiki/下载),但我不想解压缩它:我想在动态解压缩时处理它。

我知道可以在纯 Java 中执行此操作(参见例如Java - Read BZ2 file and uncompress/parse on the fly),但我想知道如何在 Apache Flink 中执行此操作?我可能需要的是类似https://github.com/whym/wikihadoop但对于 Flink,而不是 Hadoop。

4

1 回答 1

6

在 Apache Flink 中可以读取以下格式的压缩文件:

org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec

从包名中可以看出,Flink 使用 Hadoop 的 InputFormats 来做到这一点。这是使用 Flink 的 Scala API 读取文件的示例gz:(您至少需要 Flink 0.8.1)

def main(args: Array[String]) {

  val env = ExecutionEnvironment.getExecutionEnvironment
  val job = new JobConf()
  val hadoopInput = new TextInputFormat()
  FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
  val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)

  lines.print

  env.execute("Read gz files")
}

Apache Flink 仅内置对.deflate文件的支持。添加对更多压缩编解码器的支持很容易,但还没有完成。

将 HadoopInputFormats 与 Flink 一起使用不会导致任何性能损失。Flink 内置了对 HadoopWritable类型的序列化支持。

于 2015-04-03T11:49:20.777 回答