我在本地机器上创建了一个独立的 spark (2.1.1) 集群,每台机器有 9 个内核/80G(总共 27 个内核/240G Ram)
我有一个示例 spark 作业,它将从 1 到 x 的所有数字相加,这是代码:
package com.example
import org.apache.spark.sql.SparkSession
object ExampleMain {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("spark://192.168.1.2:7077")
.config("spark.driver.maxResultSize" ,"3g")
.appName("ExampleApp")
.getOrCreate()
val sc = spark.SparkContext
val rdd = sc.parallelize(Lisst.range(1, 1000))
val sum = rdd.reduce((a,b) => a+b)
println(sum)
done
}
def done = {
println("\n\n")
println("-------- DONE --------")
}
}
运行上面的代码时,几秒钟后我得到了结果,所以我把代码从 1 到 1B (1,000,000,000) 的所有数字加起来,然后达到了 GC 开销限制
我读到如果没有足够的内存,火花应该将内存溢出到硬盘驱动器,我尝试使用我的集群配置,但这没有帮助。
Driver memory = 6G
Number of workers = 24
Cores per worker = 1
Memory per worker = 10
我不是开发人员,对 Scala 一无所知,但想找到一个解决方案来运行此代码而不会出现 GC 问题。
根据@philantrovert 的请求,我正在添加我的 spark-submit 命令
/opt/spark-2.1.1/bin/spark-submit \
--class "com.example.ExampleMain" \
--master spark://192.168.1.2:6066 \
--deploy-mode cluster \
/mnt/spark-share/example_2.11-1.0.jar
此外,我的 spark/conf 如下:
- slaves 文件包含我的节点(包括主节点)的 3 个 IP 地址
- spark-defaults 包含:
- spark.master spark://192.168.1.2:7077
- spark.driver.memory 10g
- spark-env.sh 包含:
- SPARK_LOCAL_DIRS= 所有节点之间的共享文件夹
- SPARK_EXECUTOR_MEMORY=10G
- SPARK_DRIVER_MEMORY=10G
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=10G
- SPARK_WORKER_INSTANCES=8
- SPARK_WORKER_DIR= 所有节点之间的共享文件夹
- SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true"
谢谢