0

I am loading around 4GB of data from parquet files into a Spark DF. Loading takes few hundred millisecs. Then I register the DF as a table to Execute SQL queries. sparkDF = sqlContext.read.parquet("<path>/*.parquet") sparkDF.registerTempTable("sparkDF")

One of those which is a selective query with 60 columns in the select list gave out of memory exception.

spark.sql("select <60 columns list> from sessions where endtime >= '2019-07-01 00:00:00' and endtime < '2019-07-01 03:00:00' and id = '<uuid>'").show()
[Stage 12:>                                                      (0 + 36) / 211]2019-09-16 21:18:45,583 ERROR executor.Executor: Exception in task 25.0 in stage 12.0 (TID 1608)
java.lang.OutOfMemoryError: Java heap space

When I remove some of the columns from the select list, it is getting executed successfully. I tried to increase the spark.executor.memory and spark.driver.memory to about 16g. But the issue could not be resolved.

Then I updated the spark version to the latest one 2.4.4. It no more gives the error now.

But with the same updated version when I write the same DF in delta format, I am getting the same out of memory error.

sessions.write.format("delta").save("/usr/spark-2.4.4/data/data-delta/")
[Stage 5:>                                                        (0 + 36) / 37]2019-09-18 18:58:04,362 ERROR executor.Executor: Exception in task 21.0 in stage 5.0 (TID 109)
java.lang.OutOfMemoryError: Java heap space
    at org.apache.hadoop.io.compress.DecompressorStream.<init>(DecompressorStream.java:64)
    at org.apache.hadoop.io.compress.DecompressorStream.<init>(DecompressorStream.java:71)
    at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.<init>(NonBlockedDecompressorStream.java:36)
    at org.apache.parquet.hadoop.codec.SnappyCodec.createInputStream(SnappyCodec.java:75)
    at org.apache.parquet.hadoop.CodecFactory$HeapBytesDecompressor.decompress(CodecFactory.java:109)
    at org.apache.parquet.hadoop.ColumnChunkPageReadStore$ColumnChunkPageReader$1.visit(ColumnChunkPageReadStore.java:93)
    at org.apache.parquet.hadoop.ColumnChunkPageReadStore$ColumnChunkPageReader$1.visit(ColumnChunkPageReadStore.java:88)
    at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:95)
    at org.apache.parquet.hadoop.ColumnChunkPageReadStore$ColumnChunkPageReader.readPage(ColumnChunkPageReadStore.java:88)
    at org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:532)
    at org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:525)
    at org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:638)
    at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:353)
    at org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:80)
    at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:75)
    at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
    at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
    at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
    at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:232)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)

Any better suggestions/improvements on this will be helpful in resolving the problem.

4

3 回答 3

1

在 Spark 版本 2.4.4 中,仅在运行时增加驱动程序内存有助于解决该问题。

pyspark --packages io.delta:delta-core_2.11:0.3.0 --driver-memory 5g

于 2019-09-18T19:53:53.860 回答
0

增加驱动程序和执行程序内存的解决方案是非常临时的解决方案。这也是关于并行性的。驱动程序不需要 16gb 内存。

而不是 spark.sql("select <60 columns list> from sessions where endtime >= '2019-07-01 00:00:00' and endtime < '2019-07-01 03:00:00' and id = ' '").show() 你应该使用 spark.sql("select * from sessions where endtime >= '2019-07-01 00:00:00' and endtime < '2019-07-01 03:00:00'和 id = ''").show(60)

于 2019-09-19T05:02:22.683 回答
0

您可以增加 VM 可以使用的 RAM 量。虚拟机选项是:

 -Xms: sets the minimum memory usage. Syntax: -Xms2048m        (2 GB of memory)

 -Xmx: sets the maximum memory usage. Syntax: -Xmx2048m

我不确定这是否能解决你的问题,但你应该试一试。

于 2019-09-18T19:06:35.810 回答