0

我正在使用带有 GeoSpark 1.2.0 扩展的 Spark 2.4.3。

我有两个表作为范围距离加入。一个表 ( t1) 如果 ~ 100K 行只有一列是 Geospark 的几何图形。另一个表 ( t2) 大约有 30M 行,它由一个Int值和一个 Geospark 的几何列组成。

我想做的只是一个简单的:

    val spark = SparkSession
      .builder()
//      .master("local[*]")
      .config("spark.serializer", classOf[KryoSerializer].getName)
      .config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
      .config("geospark.global.index", "true")
      .config("geospark.global.indextype", "rtree")
      .config("geospark.join.gridtype", "rtree")
      .config("geospark.join.numpartition", 200)
      .config("spark.sql.parquet.filterPushdown", "true")
//      .config("spark.sql.shuffle.partitions", 10000)
      .config("spark.sql.autoBroadcastJoinThreshold", -1)
      .appName("PropertyMaster.foodDistanceEatout")
      .getOrCreate()

GeoSparkSQLRegistrator.registerAll(spark)

spark.sparkContext.setLogLevel("ERROR")

spark.read
  .load(s"$dataPath/t2")
  .repartition(200)
  .createOrReplaceTempView("t2")

spark.read
  .load(s"$dataPath/t1")
  .repartition(200)
  .cache()
  .createOrReplaceTempView("t1")

val query =
  """
    |select /*+ BROADCAST(t1) */
    |  t2.cid, ST_Distance(t1.geom, t2.geom) as distance
    |  from t2, t1 where ST_Distance(t1.geom, t2.geom) <= 3218.69""".stripMargin

spark.sql(query)
  .repartition(200)
  .write.mode(SaveMode.Append)
  .option("path", s"$dataPath/my_output.csv")
  .format("csv").save()

我尝试了不同的配置,无论是在本地运行它还是在笔记本电脑上的本地集群上运行它(tot mem 16GB 和 8 个内核),但没有任何运气,因为程序在 GeoSpark 的“Distinct at Join”处崩溃,并有很多改组。但是我无法从 SparkSQL 语法中删除改组。我想在最大的表上添加一个额外的列 ID,例如每 200 行左右的相同整数,然后重新分区,但也没有用。

我期待一个用于 GeoSpark 索引的分区器,但我不确定它是否正常工作。

任何想法?

4

1 回答 1

0

我自己找到了答案,因为 GC 开销的问题是由于分区以及 GeoSpark 的 Partitioner 所需的内存(基于索引)以及由于已解决的长地理查询计算导致的超时,添加以下参数作为GeoSpark网站本身建议:

spark.executor.memory 4g
spark.driver.memory 10g
spark.network.timeout 10000s
spark.driver.maxResultSize 5g
于 2019-07-28T09:47:42.613 回答