1

我正在运行一个包含 80 台机器的 spark 集群。每台机器都是一个具有 8 核和 50GB 内存的 VM(Spark 似乎可以使用 41 个)。

我在几个输入文件夹上运行,我估计输入的大小约为 250GB gz 压缩。

我在驱动程序日志中收到错误,我不知道该怎么做。示例(按照它们在日志中出现的顺序):

240884 [Result resolver thread-0] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 445.0 in stage 1.0 (TID 445, hadoop-w-59.c.taboola-qa-01.internal): java.net.SocketTimeoutException: Read timed out
        java.net.SocketInputStream.socketRead0(Native Method)
        java.net.SocketInputStream.read(SocketInputStream.java:152)
        java.net.SocketInputStream.read(SocketInputStream.java:122)
        java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
        java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
        java.io.BufferedInputStream.read(BufferedInputStream.java:334)
        sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:687)
        sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633)
        sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1323)
        org.apache.spark.util.Utils$.fetchFile(Utils.scala:376)
        org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:325)
        org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:323)
        scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:323)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)


    271722 [Result resolver thread-3] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 247.0 in stage 2.0 (TID 883, hadoop-w-79.c.taboola-qa-01.internal): java.lang.NullPointerException: 
            org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153)
            org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
            org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
            org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
            org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
            org.apache.spark.scheduler.Task.run(Task.scala:54)
            org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            java.lang.Thread.run(Thread.java:745)


309052 [Result resolver thread-1] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 272.0 in stage 2.0 (TID 908, hadoop-w-58.c.taboola-qa-01.internal): java.io.IOException: unexpected exception type
        java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
        java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
        java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
        org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)


820940 [connection-manager-thread] INFO org.apache.spark.network.ConnectionManager  - key already cancelled ? sun.nio.ch.SelectionKeyImpl@1c827563
java.nio.channels.CancelledKeyException
    at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
    at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)

由于我的作业类(Phase0)不是任何堆栈跟踪的一部分,我不确定我可以从这些错误中学到什么问题的根源。有什么建议么?

编辑:具体来说,即使我在几个 GB 文件夹上工作,也会发生以下异常:

271722 [Result resolver thread-3] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 247.0 in stage 2.0 (TID 883, hadoop-w-79.c.taboola-qa-01.internal): java.lang.NullPointerException: 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:153)
            org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
            org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
            org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
            org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
            org.apache.spark.scheduler.Task.run(Task.scala:54)
            org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            java.lang.Thread.run(Thread.java:745)
4

1 回答 1

1

该解决方案并不特定于此处提到的异常,但最终我能够使用以下准则克服 Spark 中的所有问题:

  1. 所有机器都应根据 ulimit 和进程内存进行调整,如下所示:

将以下内容添加到 /etc/security/limits.conf:

hadoop soft nofile 900000
root soft nofile 900000
hadoop hard nofile 990000
root hard nofile 990000
hadoop hard memlock unlimited
root hard memlock unlimited
hadoop soft memlock unlimited
root soft memlock unlimited

到 /etc/pam.d/common-session 和 /etc/pam.d/common-session-noninteractive:

"session required pam_limits.so"
  1. 核心使用:如果使用虚拟机,我建议为 Spark 分配 n-1 个核心,留 1 个核心用于通信和其他任务。

  2. 分区:我建议使用集群中已用核心数的 5 倍到 10 倍的分区数。如果您看到“内存不足”错误,则需要增加分区数(首先通过增加因子,然后将机器添加到集群)

  3. 按键输出数组 - 如果您看到诸如“数组大小超出 VM 限制”之类的错误,则可能每个键的数据过多,因此您需要减少每个键的数据量。例如,如果您按 1 小时间隔输出文件,请尝试减少到​​ 10 分钟,甚至 1 分钟间隔。

  4. 如果您仍然看到错误,请在 Spark 错误报告中查找它们,您可能需要确保升级到最新的 Spark 版本。对我来说,目前 1.2 版修复了一个会导致我的工作失败的错误。

  5. 使用 Kryo 注册器,将所有 RDD 转换逻辑转换为在单独的类中运行,并确保使用 Kryo 注册所有这些类。

于 2015-01-05T20:12:52.333 回答