2

伙计们,

我正在运行 pyspark 代码以从 hdfs 读取 500mb 文件并从文件内容构造一个 numpy 矩阵

集群信息:

9 个数据节点 128 GB 内存 /48 vCore CPU /节点

作业配置

  conf = SparkConf().setAppName('test') \
                          .set('spark.executor.cores', 4) \
                          .set('spark.executor.memory', '72g') \
                          .set('spark.driver.memory', '16g') \
                          .set('spark.yarn.executor.memoryOverhead',4096 ) \
                          .set('spark.dynamicAllocation.enabled', 'true') \
                          .set('spark.shuffle.service.enabled', 'true') \
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
                          .set('spark.driver.maxResultSize',10000) \
                          .set('spark.kryoserializer.buffer.max', 2044) 

    fileRDD=sc.textFile("/tmp/test_file.txt")
    fileRDD.cache
    list_of_lines_from_file = fileRDD.map(lambda line: line.split(" ")).collect()

错误

收集件正在吐出内存不足错误。

18/05/17 19:03:15 ERROR client.TransportResponseHandler: Still have 1 
requests outstanding when connection fromHost/IP:53023 is closed
18/05/17 19:03:15 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches
java.lang.OutOfMemoryError: Java heap space

任何帮助深表感谢。

4

1 回答 1

1

关于这个问题的一点背景

当我通过运行在 hadoop 集群的边缘节点上的 Jupyter Notebook 运行代码时,我遇到了这个问题

在 Jupyter 中寻找

因为您只能通过客户端模式从 Jupyter 提交代码(相当于从 edgenode 启动 spark-shell),所以 spark 驱动程序始终是 edgenode,它已经与其他长时间运行的守护进程打包在一起,可用内存总是小于我的文件上 fileRDD.collect() 所需的内存

在 spark-submit 中工作正常

我将 Jupyer 中的内容放到一个 .py 文件中,并通过 spark-submit 以相同的设置调用它哇!,它在那里运行了几秒钟,原因是 spark-submit 被优化为从需要从集群中释放内存的节点之一中选择驱动程序节点。

spark-submit --name  "test_app" --master yarn --deploy-mode cluster --conf spark.executor.cores=4 --conf spark.executor.memory=72g --conf spark.driver.memory=72g --conf spark.yarn.executor.memoryOverhead=8192 --conf spark.dynamicAllocation.enabled=true  --conf spark.shuffle.service.enabled=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kryoserializer.buffer.max=2044 --conf spark.driver.maxResultSize=1g --conf spark.driver.extraJavaOptions='-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MaxDirectMemorySize=2g' --conf spark.executor.extraJavaOptions='-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MaxDirectMemorySize=2g' test.py

下一步 :

我们的下一步是查看 Jupyter notebook 是否可以通过 Livy JobServer 或类似方法将 spark 作业提交到 YARN 集群。

于 2018-05-18T20:35:14.630 回答