答案是“做不到”。至少,不适用于将 hadoop 流应用到源自 hadoop 之外的 snappy 压缩文件的特定情况。
我(彻底!)探索了两个主要选项来得出这个结论:(1)尝试使用 highcaffeinated 建议的 hadoop 的内置 snappy 压缩,或(2)编写我自己的流模块来使用和解压缩 snappy 文件。
对于选项 (1),似乎 hadoop 在使用 snappy 压缩文件时会向文件添加一些标记。由于我的文件是在 hadoop 外部使用 snappy 压缩的,所以 hadoop 的内置编解码器无法解压缩文件。
此问题的一个症状是堆空间错误:
2013-04-03 20:14:49,739 FATAL org.apache.hadoop.mapred.Child (main): Error running child : java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:102)
at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:82)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:76)
at java.io.InputStream.read(InputStream.java:85)
...
当我切换到一个更大的实例并启动 mapred.child.java.opts 设置时,我得到了一个新错误:
java.io.IOException: IO error in map input file s3n://my-bucket/my-file.snappy
Hadoop 的 snappy 编解码器不适用于外部生成的文件。
对于选项 (2),问题在于 hadoop 流不区分 \n、\r 和 \r\n 换行符。由于快速压缩最终会在整个压缩文件中散布这些字节码,因此这是致命的。这是我的错误跟踪:
2013-04-03 22:29:50,194 WARN org.apache.hadoop.mapred.Child (main): Error running child
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:372)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:586)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
...
通过对 hadoop 的 Java 类做一些工作(例如,参见这里),我们或许可以解决 \r 与 \n 的问题。但正如我最初所说,我的目标是在 hadoop 流模块中构建,而不涉及 Java。有了这个限制,似乎没有任何方法可以解决这个问题。
最后,我回到了生成这个集群正在消耗的文件的人那里,并说服他们切换到 gzip 或 lzo。
PS - 在选项 (2) 上,我尝试在不同的字符上拆分记录(例如 textinputformat.record.delimiter=X),但它感觉非常hacky 并且无论如何都不起作用。
PPS - 另一种解决方法是编写脚本以从 S3 下载文件,解压缩它们,然后运行 -copyFromLocal 将它们拉入 HDFS。在计算上,这并没有错,但从工作流程的角度来看,它会带来各种麻烦。