2

我在本地机器上创建了一个独立的 spark (2.1.1) 集群,每台机器有 9 个内核/80G(总共 27 个内核/240G Ram)

我有一个示例 spark 作业,它将从 1 到 x 的所有数字相加,这是代码:

package com.example

import org.apache.spark.sql.SparkSession

object ExampleMain {

    def main(args: Array[String]): Unit = {
      val spark = SparkSession.builder
          .master("spark://192.168.1.2:7077")
          .config("spark.driver.maxResultSize" ,"3g")
          .appName("ExampleApp")
          .getOrCreate()
      val sc = spark.SparkContext
      val rdd = sc.parallelize(Lisst.range(1, 1000))
      val sum = rdd.reduce((a,b) => a+b)
      println(sum)
      done
    }

    def done = {
      println("\n\n")
      println("-------- DONE --------")
    }
}

运行上面的代码时,几秒钟后我得到了结果,所以我把代码从 1 到 1B (1,000,000,000) 的所有数字加起来,然后达到了 GC 开销限制

我读到如果没有足够的内存,火花应该将内存溢出到硬盘驱动器,我尝试使用我的集群配置,但这没有帮助。

Driver memory = 6G
Number of workers = 24
Cores per worker = 1
Memory per worker = 10

我不是开发人员,对 Scala 一无所知,但想找到一个解决方案来运行此代码而不会出现 GC 问题。

根据@philantrovert 的请求,我正在添加我的 spark-submit 命令

/opt/spark-2.1.1/bin/spark-submit \
--class "com.example.ExampleMain" \
--master spark://192.168.1.2:6066 \
--deploy-mode cluster \
/mnt/spark-share/example_2.11-1.0.jar

此外,我的 spark/conf 如下:

  • slaves 文件包含我的节点(包括主节点)的 3 个 IP 地址
  • spark-defaults 包含:
    • spark.master spark://192.168.1.2:7077
    • spark.driver.memory 10g
  • spark-env.sh 包含:
    • SPARK_LOCAL_DIRS= 所有节点之间的共享文件夹
    • SPARK_EXECUTOR_MEMORY=10G
    • SPARK_DRIVER_MEMORY=10G
    • SPARK_WORKER_CORES=1
    • SPARK_WORKER_MEMORY=10G
    • SPARK_WORKER_INSTANCES=8
    • SPARK_WORKER_DIR= 所有节点之间的共享文件夹
    • SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true"

谢谢

4

1 回答 1

3

我想问题是你List在驱动程序上创建了一个有 10 亿个条目的,这是一个巨大的数据结构(4GB)。有一种更有效的方式以编程方式创建数据集/RDD:

val rdd = spark.range(1000000000L).rdd
于 2017-11-28T11:37:10.560 回答