2

假设您有 Spark + Standalone 集群管理器。您使用一些配置打开了 spark 会话,并希望SomeSparkJob使用不同的参数并行启动 40 次。

问题

  1. 如何在工作失败时设置reties数量?
  2. 如何在失败时以编程方式重新启动作业?如果作业因缺乏资源而失败,这可能很有用。我可以一一启动所有需要额外资源的工作。
  3. 如何在作业失败时重新启动 Spark 应用程序?如果作业即使同时启动也缺乏资源,这可能很有用。比起更改内核、CPU 等配置,我需要在独立集群管理器中重新启动应用程序。

我的解决方法

1)我很确定第一点是可能的,因为它可以在spark local mode。我只是不知道如何在独立模式下做到这一点。
2-3) 可以在 spark 上下文中传递侦听器,例如spark.sparkContext().addSparkListener(new SparkListener() {. 但似乎SparkListener缺乏失败回调。

还有一堆方法文档很差。我从未使用过它们,但也许它们可以帮助解决我的问题。

spark.sparkContext().dagScheduler().runJob();
spark.sparkContext().runJob()
spark.sparkContext().submitJob()
spark.sparkContext().taskScheduler().submitTasks();
spark.sparkContext().dagScheduler().handleJobCancellation();
spark.sparkContext().statusTracker()
4

2 回答 2

1

您可以使用SparkLauncher并控制流程。

import org.apache.spark.launcher.SparkLauncher;

   public class MyLauncher {
     public static void main(String[] args) throws Exception {
       Process spark = new SparkLauncher()
         .setAppResource("/my/app.jar")
         .setMainClass("my.spark.app.Main")
         .setMaster("local")
         .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
         .launch();
       spark.waitFor();
     }
   }

有关详细信息,请参阅API 。

由于它创建进程,您可以检查进程状态并重试,例如尝试以下操作:

public boolean isAlive()

如果 Process 没有重新启动,请参阅API了解更多详细信息。

希望这可以让我们深入了解我们如何实现您在问题中提到的内容。可能有更多方法可以做同样的事情,但考虑分享这种方法。

干杯!

于 2017-10-05T16:00:16.727 回答
0

检查您的 spark.sql.broadcastTimeout 和 spark.broadcast.blockSize 属性,尝试增加它们。

于 2018-06-20T15:46:16.807 回答