1

我们的主要目标是要对大量输入数据(大约 80 GB)执行操作。问题是即使对于较小的数据集,我们也经常会遇到 java 堆空间或其他与内存相关的错误。

我们的临时解决方案是简单地指定更高的最大堆大小(-Xmx在本地使用或通过设置spark.executor.memoryspark.driver.memory用于我们的 spark 实例),但这似乎不能很好地概括,对于更大的数据集或更高的缩放级别,我们仍然会遇到错误。

为了更好地理解,以下是我们处理数据的基本概念:

  1. 使用加载数据HadoopGeoTiffRDD.spatial(new Path(path))

  2. 将数据映射到某个缩放级别的图块

    val extent = geotrellis.proj4.CRS.fromName("EPSG:4326").worldExtent
    val layout = layoutForZoom(zoom, extent)
    val metadata: TileLayerMetadata[SpatialKey] = dataSet.collectMetadata[SpatialKey](layout)
    val rdd = ContextRDD(dataSet.tileToLayout[SpatialKey](metadata), metadata)
    

    和哪里layoutForZoom基本一样geotrellis.spark.tiling.ZoomedLayoutScheme.layoutForZoom

  3. rdd.map然后我们使用rdd.foreach映射的 rdds对 rdd 的条目执行一些操作。

  4. 我们聚合四个图块的结果,这些图块对应于更高缩放级别的单个图块,使用groupByKey

  5. 转到 3 直到我们达到一定的缩放级别

目标是:给定 X GB 的内存限制,以我们最多消耗 X GB 的方式对数据进行分区和处理。

似乎数据集的平铺tileToLayout在更高的缩放级别上已经占用了太多的内存(即使对于非常小的数据集)。根据某些 LayoutDefinition 平铺和加载数据是否有任何替代方法?据我们了解,HadoopGeoTiffRDD.spatial已经将数据集拆分为小区域,然后按tileToLayout. 是否可以直接加载与 LayoutDefinition 对应的数据集?

在我们的具体场景中,我们有 3 个工作人员,每个工作人员具有 2GB RAM 和 2 个内核。其中之一也在运行 spark master,它通过来自驱动程序实例的 spark-submit 获取其工作。我们尝试了这样的配置:

val conf = new SparkConf().setAppName("Converter").setMaster("spark://IP-ADDRESS:PORT")
  .set("spark.executor.memory", "900m") // to be below the available 1024 MB of default slave RAM
  .set("spark.memory.fraction", "0.2") // to get more usable heap space
  .set("spark.executor.cores", "2")
  .set("spark.executor.instances", "3")

平铺步骤(步骤 2)中堆空间错误的示例:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 5, 192.168.0.2, executor 1): java.lang.OutOfMemoryError: Java heap space
        at scala.collection.mutable.ArrayBuilder$ofByte.mkArray(ArrayBuilder.scala:128)
        at scala.collection.mutable.ArrayBuilder$ofByte.resize(ArrayBuilder.scala:134)
        at scala.collection.mutable.ArrayBuilder$ofByte.sizeHint(ArrayBuilder.scala:139)
        at scala.collection.IndexedSeqOptimized$class.slice(IndexedSeqOptimized.scala:115)
        at scala.collection.mutable.ArrayOps$ofByte.slice(ArrayOps.scala:198)
        at geotrellis.util.StreamingByteReader.getBytes(StreamingByteReader.scala:98)
        at geotrellis.raster.io.geotiff.LazySegmentBytes.getBytes(LazySegmentBytes.scala:104)
        at geotrellis.raster.io.geotiff.LazySegmentBytes.readChunk(LazySegmentBytes.scala:81)
        at geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99)
        at geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185)
        at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1012)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1010)
        at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2118)
        at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2118)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
        at geotrellis.spark.TileLayerMetadata$.collectMetadataWithCRS(TileLayerMetadata.scala:147)
        at geotrellis.spark.TileLayerMetadata$.fromRdd(TileLayerMetadata.scala:281)
        at geotrellis.spark.package$withCollectMetadataMethods.collectMetadata(package.scala:212)
        ...

更新:

我从我的代码中提取了一个示例并将其上传到位于https://gitlab.com/hwuerz/geotrellis-spark-example的存储库。sbt run您可以使用并选择类在本地运行示例demo.HelloGeotrellisexample.tif这将根据我们从缩放级别 20 开始的布局定义为微小的输入数据集创建图块(默认使用两个核心,可以在文件中进行调整HelloGeotrellis.scala~如果级别 20 仍然有效,它很可能会使用更高的值失败bottomLayer)。

要在 Spark 集群上运行代码,我使用以下命令:

 `sbt package && bash submit.sh --dataLocation /mnt/glusterfs/example.tif --bottomLayer 20 --topLayer 10 --cesiumTerrainDir /mnt/glusterfs/terrain/ --sparkMaster spark://192.168.0.8:7077`

submit.sh基本上在哪里运行spark-submit(请参阅 repo 中的文件)。

example.tif包含在目录中的 repo中DebugFiles。在我的设置中,文件是通过 glusterfs 分发的,这就是路径指向该位置的原因。这cesiumTerrainDir只是我们存储生成的输出的目录。

我们认为主要问题可能是使用给定的 api 调用,geotrellis 将布局的完整结构加载到内存中,这对于更高的缩放级别来说太大了。有没有办法避免这种情况?

4

0 回答 0