我想在一个 Spark 集群中并行运行多个 Spark SQL,这样我就可以在整个集群范围内利用整个资源。我正在使用 sqlContext.sql(query)。
我在这里看到了一些示例代码,如下所示,
val parallelism = 10
val executor = Executors.newFixedThreadPool(parallelism)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val tasks: Seq[String] = ???
val results: Seq[Future[Int]] = tasks.map(query => {
Future{
//spark stuff here
0
}(ec)
})
val allDone: Future[Seq[Int]] = Future.sequence(results)
//wait for results
Await.result(allDone, scala.concurrent.duration.Duration.Inf)
executor.shutdown //otherwise jvm will probably not exit
据我了解,ExecutionContext计算机器中的可用内核(使用 ForkJoinPool)并相应地进行并行处理。但是如果考虑单机以外的spark集群会怎样,如何保证集群资源的完整利用率呢?
eg: 如果我有一个 10 节点的集群,每个 4 核,那么上面的代码如何保证 40 核将被使用。
编辑:-
假设有 2 个 sql 要执行,我们有 2 种方法来执行此操作,
按顺序提交查询,以便第二个查询仅在第一个查询执行后完成。(因为 sqlContext.sql(query) 是同步调用)
使用 Futures 并行提交这两个查询,以便假设有足够的资源(在这两种情况下),这两个查询将在集群中独立和并行执行。
我认为第二个更好,因为它使用集群中可用的最大资源,如果第一个查询充分利用了资源,调度程序将等待作业完成(取决于策略),这在这种情况下是公平的。
但是正如 user9613318 提到的“增加池大小将使驱动程序饱和”那么我如何有效地控制线程以更好地利用资源。