5

我正在对存储在 S3 中的一些 LZO 压缩日志文件运行 EMR Spark 作业。有几个日志文件存储在同一个文件夹中,例如:

...
s3://mylogfiles/2014-08-11-00111.lzo
s3://mylogfiles/2014-08-11-00112.lzo
...

在 spark-shell 中,我正在运行一项计算文件中行数的作业。如果我为每个文件单独计算行数,则没有问题,例如:

// Works fine
...
sc.textFile("s3://mylogfiles/2014-08-11-00111.lzo").count()
sc.textFile("s3://mylogfiles/2014-08-11-00112.lzo").count()
...

如果我使用通配符以单行方式加载所有文件,则会出现两种异常。

// One-liner throws exceptions
sc.textFile("s3://mylogfiles/*.lzo").count()

例外情况是:

java.lang.InternalError: lzo1x_decompress_safe returned: -6
    at com.hadoop.compression.lzo.LzoDecompressor.decompressBytesDirect(Native Method)

java.io.IOException: Compressed length 1362309683 exceeds max block size 67108864 (probably corrupt file)
    at com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:291)

在我看来,解决方案是由最后一个例外给出的文本暗示的,但我不知道如何继续。LZO 文件的大小是否有限制,或者有什么问题?

我的问题是:我可以运行将所有 LZO 压缩文件加载到 S3 文件夹中的 Spark 查询,而不会出现与 I/O 相关的异常吗?

有 66 个文件,每个文件大约 200MB。

编辑:只有在使用 Hadoop2 核心库(ami 3.1.0)运行 Spark 时才会发生异常。使用 Hadoop1 核心库(ami 2.4.5)运行时,一切正常。这两种情况都使用 Spark 1.0.1 进行了测试。

4

3 回答 3

5

kgeyti 的回答很好,但是:

LzoTextInputFormat引入了性能损失,因为它会检查每个 LZO 文件的 .index 文件。这对于 S3 上的许多 LZO 文件尤其痛苦(我经历了长达几分钟的延迟,这是由对 S3 的数千个请求引起的)。

如果您事先知道您的 LZO 文件不可拆分,则更高效的解决方案是创建自定义的不可拆分输入格式:

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

class NonSplittableTextInputFormat extends TextInputFormat {
    override def isSplitable(context: JobContext, file: Path): Boolean = false
}

并阅读这样的文件:

context.newAPIHadoopFile("s3://mylogfiles/*.lzo",
  classOf[NonSplittableTextInputFormat],
  classOf[org.apache.hadoop.io.LongWritable],
  classOf[org.apache.hadoop.io.Text])
.map(_._2.toString)
于 2014-10-02T20:02:25.597 回答
4

昨天我们在 EMR 集群上部署了 Hive,并且在 S3 中的一些 LZO 文件遇到了同样的问题,这些文件已被另一个非 EMR 集群毫无问题地采取。在对日志进行了一些挖掘之后,我注意到 map 任务以 250MB 的块读取 S3 文件,尽管这些文件绝对不可拆分

原来,参数mapreduce.input.fileinputformat.split.maxsize设置为250000000~250MB。这导致 LZO 从文件中打开流,最终导致 LZO 块损坏。

我将参数mapreduce.input.fileinputformat.split.maxsize=2000000000设置为更大的输入数据的最大文件大小,现在一切正常。

我不确定这与 Spark 有什么关系,但更改 InputFormat 可能会有所帮助,这似乎首先是问题所在,正如在Amazon EMR Hive 如何与 Apache Hive 不同中提到的那样。

于 2014-08-13T21:32:36.997 回答
4

我自己还没有遇到这个具体问题,但它看起来像.textFile期望文件是可拆分的,就像 Cedrik 的 Hive 坚持使用的问题一样CombineFileInputFormat

您可以索引您的 lzo 文件,或者尝试使用LzoTextInputFormat- 我很想知道它是否在 EMR 上效果更好:

sc.newAPIHadoopFile("s3://mylogfiles/*.lz", 
    classOf[com.hadoop.mapreduce.LzoTextInputFormat],
    classOf[org.apache.hadoop.io.LongWritable],
    classOf[org.apache.hadoop.io.Text])
  .map(_._2.toString) // if you just want a RDD[String] without writing a new InputFormat
  .count
于 2014-08-15T15:14:31.523 回答