0

我正在尝试使用带有 Scala 2.12.10 的 Spark 3.1.1 加载存档大小为 16.4 GB 的 zstd 压缩 JSON 文件。请参阅示例文件以供参考。

作为参考,我的电脑有 32 GB 的 RAM。我正在使用的 zstd 解压缩器来自本机库

LD_LIBRARY_PATH=/opt/hadoop/lib/native

我的配置:

trait SparkProdContext {

  private val master = "local[*]"
  private val appName = "testing"
  private val conf: SparkConf = new SparkConf()
    .setMaster(master)
    .setAppName(appName)
    .set("spark.driver.allowMultipleContexts", "false")
    .set("spark.ui.enabled", "false")

  val ss: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  val sc: SparkContext = ss.sparkContext
  val sqlContext: SQLContext = ss.sqlContext

}

我的代码:

  import org.apache.spark.sql.types.StructType
  import org.apache.spark.sql.catalyst.ScalaReflection



  import ss.implicits._


  case class Comment (
                     author: String,
                     body: String,
                     score: BigInt,
                     subreddit_id: String,
                     subreddit: String,
                     id: String,
                     parent_id: String,
                     link_id: String,
                     retrieved_on: BigInt,
                     created_utc: BigInt,
                     permalink: String
                     )

  val schema = ScalaReflection.schemaFor[Comment].dataType.asInstanceOf[StructType]

  val comments = ss.read.schema(schema).json("/home/user/Downloads/RC_2020-03.zst").as[Comment]

运行代码后,我收到以下错误。

22/01/06 23:59:44 INFO CodecPool: Got brand-new decompressor [.zst]
22/01/06 23:59:44 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.InternalError: Frame requires too much memory for decoding
    at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native Method)
    at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:181)
    at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
    at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
    at java.base/java.io.InputStream.read(InputStream.java:205)
    at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
    at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)

任何想法表示赞赏!

4

0 回答 0