我正在尝试使用带有 Scala 2.12.10 的 Spark 3.1.1 加载存档大小为 16.4 GB 的 zstd 压缩 JSON 文件。请参阅示例文件以供参考。
作为参考,我的电脑有 32 GB 的 RAM。我正在使用的 zstd 解压缩器来自本机库
LD_LIBRARY_PATH=/opt/hadoop/lib/native
我的配置:
trait SparkProdContext {
private val master = "local[*]"
private val appName = "testing"
private val conf: SparkConf = new SparkConf()
.setMaster(master)
.setAppName(appName)
.set("spark.driver.allowMultipleContexts", "false")
.set("spark.ui.enabled", "false")
val ss: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc: SparkContext = ss.sparkContext
val sqlContext: SQLContext = ss.sqlContext
}
我的代码:
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection
import ss.implicits._
case class Comment (
author: String,
body: String,
score: BigInt,
subreddit_id: String,
subreddit: String,
id: String,
parent_id: String,
link_id: String,
retrieved_on: BigInt,
created_utc: BigInt,
permalink: String
)
val schema = ScalaReflection.schemaFor[Comment].dataType.asInstanceOf[StructType]
val comments = ss.read.schema(schema).json("/home/user/Downloads/RC_2020-03.zst").as[Comment]
运行代码后,我收到以下错误。
22/01/06 23:59:44 INFO CodecPool: Got brand-new decompressor [.zst]
22/01/06 23:59:44 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.InternalError: Frame requires too much memory for decoding
at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native Method)
at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:181)
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at java.base/java.io.InputStream.read(InputStream.java:205)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
任何想法表示赞赏!