我们的主要目标是要对大量输入数据(大约 80 GB)执行操作。问题是即使对于较小的数据集,我们也经常会遇到 java 堆空间或其他与内存相关的错误。
我们的临时解决方案是简单地指定更高的最大堆大小(-Xmx
在本地使用或通过设置spark.executor.memory
和spark.driver.memory
用于我们的 spark 实例),但这似乎不能很好地概括,对于更大的数据集或更高的缩放级别,我们仍然会遇到错误。
为了更好地理解,以下是我们处理数据的基本概念:
使用加载数据
HadoopGeoTiffRDD.spatial(new Path(path))
将数据映射到某个缩放级别的图块
val extent = geotrellis.proj4.CRS.fromName("EPSG:4326").worldExtent val layout = layoutForZoom(zoom, extent) val metadata: TileLayerMetadata[SpatialKey] = dataSet.collectMetadata[SpatialKey](layout) val rdd = ContextRDD(dataSet.tileToLayout[SpatialKey](metadata), metadata)
和哪里
layoutForZoom
基本一样geotrellis.spark.tiling.ZoomedLayoutScheme.layoutForZoom
rdd.map
然后我们使用rdd.foreach
映射的 rdds对 rdd 的条目执行一些操作。我们聚合四个图块的结果,这些图块对应于更高缩放级别的单个图块,使用
groupByKey
转到 3 直到我们达到一定的缩放级别
目标是:给定 X GB 的内存限制,以我们最多消耗 X GB 的方式对数据进行分区和处理。
似乎数据集的平铺tileToLayout
在更高的缩放级别上已经占用了太多的内存(即使对于非常小的数据集)。根据某些 LayoutDefinition 平铺和加载数据是否有任何替代方法?据我们了解,HadoopGeoTiffRDD.spatial
已经将数据集拆分为小区域,然后按tileToLayout
. 是否可以直接加载与 LayoutDefinition 对应的数据集?
在我们的具体场景中,我们有 3 个工作人员,每个工作人员具有 2GB RAM 和 2 个内核。其中之一也在运行 spark master,它通过来自驱动程序实例的 spark-submit 获取其工作。我们尝试了这样的配置:
val conf = new SparkConf().setAppName("Converter").setMaster("spark://IP-ADDRESS:PORT")
.set("spark.executor.memory", "900m") // to be below the available 1024 MB of default slave RAM
.set("spark.memory.fraction", "0.2") // to get more usable heap space
.set("spark.executor.cores", "2")
.set("spark.executor.instances", "3")
平铺步骤(步骤 2)中堆空间错误的示例:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 5, 192.168.0.2, executor 1): java.lang.OutOfMemoryError: Java heap space
at scala.collection.mutable.ArrayBuilder$ofByte.mkArray(ArrayBuilder.scala:128)
at scala.collection.mutable.ArrayBuilder$ofByte.resize(ArrayBuilder.scala:134)
at scala.collection.mutable.ArrayBuilder$ofByte.sizeHint(ArrayBuilder.scala:139)
at scala.collection.IndexedSeqOptimized$class.slice(IndexedSeqOptimized.scala:115)
at scala.collection.mutable.ArrayOps$ofByte.slice(ArrayOps.scala:198)
at geotrellis.util.StreamingByteReader.getBytes(StreamingByteReader.scala:98)
at geotrellis.raster.io.geotiff.LazySegmentBytes.getBytes(LazySegmentBytes.scala:104)
at geotrellis.raster.io.geotiff.LazySegmentBytes.readChunk(LazySegmentBytes.scala:81)
at geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99)
at geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1012)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1010)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2118)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2118)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
at geotrellis.spark.TileLayerMetadata$.collectMetadataWithCRS(TileLayerMetadata.scala:147)
at geotrellis.spark.TileLayerMetadata$.fromRdd(TileLayerMetadata.scala:281)
at geotrellis.spark.package$withCollectMetadataMethods.collectMetadata(package.scala:212)
...
更新:
我从我的代码中提取了一个示例并将其上传到位于https://gitlab.com/hwuerz/geotrellis-spark-example的存储库。sbt run
您可以使用并选择类在本地运行示例demo.HelloGeotrellis
。example.tif
这将根据我们从缩放级别 20 开始的布局定义为微小的输入数据集创建图块(默认使用两个核心,可以在文件中进行调整HelloGeotrellis.scala
~如果级别 20 仍然有效,它很可能会使用更高的值失败bottomLayer
)。
要在 Spark 集群上运行代码,我使用以下命令:
`sbt package && bash submit.sh --dataLocation /mnt/glusterfs/example.tif --bottomLayer 20 --topLayer 10 --cesiumTerrainDir /mnt/glusterfs/terrain/ --sparkMaster spark://192.168.0.8:7077`
submit.sh
基本上在哪里运行spark-submit
(请参阅 repo 中的文件)。
example.tif
包含在目录中的 repo中DebugFiles
。在我的设置中,文件是通过 glusterfs 分发的,这就是路径指向该位置的原因。这cesiumTerrainDir
只是我们存储生成的输出的目录。
我们认为主要问题可能是使用给定的 api 调用,geotrellis 将布局的完整结构加载到内存中,这对于更高的缩放级别来说太大了。有没有办法避免这种情况?