1

我需要对 DataFrame 和计数进行不同的过滤操作,然后对单个计数进行求和。我使用 Scala Future 进行并发执行。这是代码:

import scala.concurrent.{Await, Future, blocking}
import scala.concurrent.ExecutionContext.Implicits.global

val f1  = Future{myDF.filter("pmod(idx, 8) = 1").count}
val f2  = Future{myDF.filter("pmod(idx, 8) = 2").count}
val f3  = Future{myDF.filter("pmod(idx, 8) = 3").count}

val future = for {c1 <- f1; c2 <- f2; c3 <- f3} yield {
   c1 + c2 + c3 
}

val summ = Await.result(future, 180 second)

每个过滤器/计数操作的运行时间大约需要 7 秒。但是,运行多次后,并发执行的总时间总是在 35 秒左右,而不是我预期的 7 秒。我对这种行为困惑了很长一段时间,但无法弄清楚。

我有一个由 3 台机器组成的集群,一个主节点,两个工作节点,每个节点有 128G 内存和 32 个内核。数据大小约为3G。我注意到在并发执行期间,一个工作节点有 20 秒的 GC 时间。我已经调整了 GC,使得单个过滤器/计数操作几乎没有 GC 时间。我不确定为什么每当我运行 3 个 Future 的并发执行时 GC 就会启动,以及是否是导致并发执行时间更长的原因。

有人在这个问题上有经验吗?

4

1 回答 1

1

作业以顺序方式在您的集群中调度,因为脚本中的每个作业都是作业 DAG 中的一个节点,它定义了它们操作的数据之间的优先关系。并且,您的整个脚本的任何成功执行都必须尊重该优先级。

即使您的工作之间没有前向关系,此规则也适用(尽管它们都依赖于相同的数据,myDF)。而且您对 Futures 的使用仅意味着您的作业几乎同时提交给调度程序,而不是它们最终以这种方式被调度。

如果您想要并行性,您应该在作业中编写它,例如:

myDF.filter("pmod(idx,8) < 4 && pmod(idx,8) > 0").groupBy("pmod(idx,8)").count()

是的,你应该缓存myDf

于 2015-11-09T08:17:56.157 回答