7

我在由三个节点组成的集群上运行一个 spark 流应用程序,每个节点都有一个工作程序和三个执行程序(总共有 9 个执行程序)。我正在使用 spark 独立模式(版本 2.1.1)。

该应用程序使用带有选项--deploy-mode client和的 spark-submit 命令运行--conf spark.streaming.stopGracefullyOnShutdown=true。提交命令从其中一个节点运行,我们称其为节点 1。

作为容错测试,我通过调用脚本来停止节点 2 上的工作程序stop-slave.sh

在节点 2 上的执行程序日志中,我可以在 shuffle 操作期间看到几个与 FileNotFoundException 相关的错误:

ERROR Executor: Exception in task 5.0 in stage 5531241.0 (TID 62488319)
java.io.FileNotFoundException: /opt/spark/spark-31c5b4b0-56e1-45d2-88dc-772b8712833f/executor-0bad0669-57fe-43f9-a77e-1b69cd284523/blockmgr-2aa295ac-78ca-4df6-ab89-51d422e8860e/1c/shuffle_2074211_5_0.index.ecb8e397-c3a3-4c1a-96ba-e153ed92b05c (No such file or directory)
    at java.io.FileOutputStream.open(Native Method)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:206)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:156)
    at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我可以在节点 2 上的 3 个执行程序中的每个执行程序的同一任务中看到 4 个此类错误。

在驱动程序日志中,我可以看到:

ERROR TaskSetManager: Task 5 in stage 5531241.0 failed 4 times; aborting job
 ...
ERROR JobScheduler: Error running job streaming job 1503995015000 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 5531241.0 failed 4 times, most recent failure: Lost task 5.3 in stage 5531241.0 (TID 62488335, 10.7.94.68, executor 2): java.io.FileNotFoundException: /opt/spark/spark-31c5b4b0-56e1-45d2-88dc-772b8712833f/executor-0bad0669-57fe-43f9-a77e-1b69cd284523/blockmgr-2aa295ac-78ca-4df6-ab89-51d422e8860e/1c/shuffle_2074211_5_0.index.9e6148da-6ce2-4de5-94ab-d95db2c8f9f7 (No such file or directory)

正如预期的那样,这正在关闭应用程序:执行spark.task.maxFailures程序在单个任务上到达,然后应用程序停止。

我进行了不同的测试,所有测试都以应用程序停止而告终。我的想法是,行为可能会根据我要求工作人员停止的流过程中的精确步骤而有所不同。在任何情况下,所有其他测试都因上述相同的错误而失败。

将参数spark.task.maxFailures增加到 8 也没有帮助,TaskSetManager 信号任务失败了 8 次而不是 4 次。

如果工人被杀了怎么办?

我还运行了一个不同的测试:我用 command 杀死了节点 2 上的 worker 和 3 个 executors 进程kill -9。在这种情况下,流媒体应用程序适应了剩余的资源并继续工作。

在驱动程序日志中,我们可以看到驱动程序注意到缺少的执行程序:

ERROR TaskSchedulerImpl: Lost executor 0 on 10.7.94.68: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

然后,我们注意到一长串以下错误:

17/08/29 14:43:19 ERROR ReceiverTracker: Deregistered receiver for stream 5: Error starting receiver 5 - org.jboss.netty.channel.ChannelException: Failed to bind to: /X.X.X.X:40001
    at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68)
    at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:162)
    at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:169)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:607)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2028)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2028)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.BindException: Cannot assign requested address
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:414)
    at sun.nio.ch.Net.bind(Net.java:406)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296)
    at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
    ... 3 more

此错误会出现在日志中,直到被杀死的工作人员再次启动。

结论

使用专用命令停止工作人员有一个意想不到的行为:应用程序应该能够处理错过的工作,适应剩余的资源并继续工作(就像在 的情况下一样kill)。

您对这个问题有何看法?

谢谢你,戴维德

4

0 回答 0