0

我想在一个 Spark 集群中并行运行多个 Spark SQL,这样我就可以在整个集群范围内利用整个资源。我正在使用 sqlContext.sql(query)。

我在这里看到了一些示例代码,如下所示,

val parallelism = 10
val executor = Executors.newFixedThreadPool(parallelism)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val tasks: Seq[String] = ???
val results: Seq[Future[Int]] = tasks.map(query => {
  Future{
    //spark stuff here
    0
  }(ec)
})
val allDone: Future[Seq[Int]] = Future.sequence(results)
//wait for results
Await.result(allDone, scala.concurrent.duration.Duration.Inf)
executor.shutdown //otherwise jvm will probably not exit 

据我了解,ExecutionContext计算机器中的可用内核(使用 ForkJoinPool)并相应地进行并行处理。但是如果考虑单机以外的spark集群会怎样,如何保证集群资源的完整利用率呢?

eg: 如果我有一个 10 节点的集群,每个 4 核,那么上面的代码如何保证 40 核将被使用。

编辑:-

假设有 2 个 sql 要执行,我们有 2 种方法来执行此操作,

  1. 按顺序提交查询,以便第二个查询仅在第一个查询执行后完成。(因为 sqlContext.sql(query) 是同步调用)

  2. 使用 Futures 并行提交这两个查询,以便假设有足够的资源(在这两种情况下),这两个查询将在集群中独立和并行执行。

我认为第二个更好,因为它使用集群中可用的最大资源,如果第一个查询充分利用了资源,调度程序将等待作业完成(取决于策略),这在这种情况下是公平的。

但是正如 user9613318 提到的“增加池大小将使驱动程序饱和”那么我如何有效地控制线程以更好地利用资源。

4

1 回答 1

1

并行性在这里的影响很小,额外的集群资源并不会真正影响该方法。Futures(或Threads)不用于并行执行,而是用于避免阻塞执行。增加池大小只会使驱动程序饱和。

您真正应该看的是 Spark 应用程序内调度池和调整窄分区数(如何在 Spark SQL 中更改分区大小,partitionColumn、lowerBound、upperBound 、numPartitions 参数的含义是什么?)和宽(什么应该是 spark.sql.shuffle.partitions 的最佳值,或者我们如何在使用 Spark SQL 时增加分区?)转换。

如果作业是完全独立的(代码结构表明这一点),最好单独提交每个作业,使用自己的一组分配资源,并相应地配置集群调度池。

于 2018-05-04T11:12:40.650 回答