我在 S3 上有 1.2GB 的 orc 数据,我正在尝试使用相同的方法执行以下操作:
1) 缓存 snappy 集群上的数据 [snappydata 0.9]
2)对缓存的数据集执行groupby查询
3) 与 Spark 2.0.0 的性能对比
我使用的是 64 GB/8 核的机器,Snappy 集群的配置如下:
$ cat locators
localhost
$cat leads
localhost -heap-size=4096m -spark.executor.cores=1
$cat servers
localhost -heap-size=6144m
localhost -heap-size=6144m
localhost -heap-size=6144m
localhost -heap-size=6144m
localhost -heap-size=6144m
localhost -heap-size=6144m
现在,我编写了一个小 Python 脚本,用于缓存 S3 中的 orc 数据并运行一个简单的 group by 查询,如下所示:
from pyspark.sql.snappy import SnappyContext
from pyspark import SparkContext,SparkConf
conf = SparkConf().setAppName('snappy_sample')
sc = SparkContext(conf=conf)
sqlContext = SnappyContext(sc)
sqlContext.sql("CREATE EXTERNAL TABLE if not exists my_schema.my_table using orc options(path 's3a://access_key:secret_key@bucket_name/path')")
sqlContext.cacheTable("my_schema.my_table")
out = sqlContext.sql("select * from my_schema.my_table where (WeekId = '1') order by sum_viewcount desc limit 25")
out.collect()
使用以下命令执行上述脚本:
spark-submit --master local[*] snappy_sample.py
我收到以下错误:
17/10/04 02:50:32 WARN memory.MemoryStore: Not enough space to cache rdd_2_5 in memory! (computed 21.2 MB so far)
17/10/04 02:50:32 WARN memory.MemoryStore: Not enough space to cache rdd_2_0 in memory! (computed 21.2 MB so far)
17/10/04 02:50:32 WARN storage.BlockManager: Persisting block rdd_2_5 to disk instead.
17/10/04 02:50:32 WARN storage.BlockManager: Persisting block rdd_2_0 to disk instead.
17/10/04 02:50:47 WARN storage.BlockManager: Putting block rdd_2_2 failed due to an exception
17/10/04 02:50:47 WARN storage.BlockManager: Block rdd_2_2 could not be removed as it was not found on disk or in memory
17/10/04 02:50:47 ERROR executor.Executor: Exception in task 2.0 in stage 0.0 (TID 2)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:96)
at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:97)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:135)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:134)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:134)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:98)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:232)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:331)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
at org.apache.spark.sql.execution.WholeStageCodegenRDD.compute(WholeStageCodegenExec.scala:496)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
17/10/04 02:50:47 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-2,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:96)
at org.apache.spark.sql.execution.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:97)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:135)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1$$anonfun$next$2.apply(InMemoryRelation.scala:134)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:134)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:98)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:232)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:331)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
at org.apache.spark.sql.execution.WholeStageCodegenRDD.compute(WholeStageCodegenExec.scala:496)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:320)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:284)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
17/10/04 02:50:48 INFO snappystore: VM is exiting - shutting down distributed system
除了上述错误,如何检查数据是否已缓存在 snappy 集群中?