5

我在 S3 中有一堆经过快速压缩的服务器日志,我需要使用 Elastic MapReduce 上的流处理它们。我如何告诉 Amazon 和 Hadoop 日志已经压缩(在它们被拉入 HFS 之前!),以便可以在发送到流式映射器脚本之前对其进行解压缩?

我能找到的唯一文档是:http: //docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/HadoopDataCompression.html#emr-using-snappy ,它似乎指的是中间压缩,而不是文件当它们到达 HFS 时被压缩。

顺便说一句,我主要在 python 中工作,所以如果你有 boto 的解决方案,那就加分吧!

4

2 回答 2

7

答案是“做不到”。至少,不适用于将 hadoop 流应用到源自 hadoop 之外的 snappy 压缩文件的特定情况。

我(彻底!)探索了两个主要选项来得出这个结论:(1)尝试使用 highcaffeinated 建议的 hadoop 的内置 snappy 压缩,或(2)编写我自己的流模块来使用和解压缩 snappy 文件。

对于选项 (1),似乎 hadoop 在使用 snappy 压缩文件时会向文件添加一些标记。由于我的文件是在 hadoop 外部使用 snappy 压缩的,所以 hadoop 的内置编解码器无法解压缩文件。

此问题的一个症状是堆空间错误:

2013-04-03 20:14:49,739 FATAL org.apache.hadoop.mapred.Child (main): Error running child : java.lang.OutOfMemoryError: Java heap space
    at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:102)
    at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:82)
    at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:76)
    at java.io.InputStream.read(InputStream.java:85)
    ...

当我切换到一个更大的实例并启动 mapred.child.java.opts 设置时,我得到了一个新错误:

java.io.IOException: IO error in map input file s3n://my-bucket/my-file.snappy

Hadoop 的 snappy 编解码器不适用于外部生成的文件。

对于选项 (2),问题在于 hadoop 流不区分 \n、\r 和 \r\n 换行符。由于快速压缩最终会在整个压缩文件中散布这些字节码,因此这是致命的。这是我的错误跟踪:

2013-04-03 22:29:50,194 WARN org.apache.hadoop.mapred.Child (main): Error running child
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:372)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:586)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
    ...

通过对 hadoop 的 Java 类做一些工作(例如,参见这里),我们或许可以解决 \r 与 \n 的问题。但正如我最初所说,我的目标是在 hadoop 流模块中构建,而不涉及 Java。有了这个限制,似乎没有任何方法可以解决这个问题。

最后,我回到了生成这个集群正在消耗的文件的人那里,并说服他们切换到 gzip 或 lzo。

PS - 在选项 (2) 上,我尝试在不同的字符上拆分记录(例如 textinputformat.record.delimiter=X),但它感觉非常hacky 并且无论如何都不起作用。

PPS - 另一种解决方法是编写脚本以从 S3 下载文件,解压缩它们,然后运行 ​​-copyFromLocal 将它们拉入 HDFS。在计算上,这并没有错,但从工作流程的角度来看,它会带来各种麻烦。

于 2013-04-04T20:31:58.787 回答
1

Assuming you are using TextInputFormat (or one of its subclasses), compressed input files with .snappy extension are handled automatically.

You might want to consider using lzo compression (.gz extenstion) instead of snappy. You give up some compression speed for better compression ratio and an input file that is splittable. Cloudera mentions this in their blog:

One thing to note is that Snappy is intended to be used with a container format, like Sequence Files or Avro Data Files, rather than being used directly on plain text, for example, since the latter is not splittable and can’t be processed in parallel using MapReduce. This is different to LZO, where is is possible to index LZO compressed files to determine split points so that LZO files can be processed efficiently in subsequent processing.

于 2013-03-31T23:00:46.820 回答