我需要对 DataFrame 和计数进行不同的过滤操作,然后对单个计数进行求和。我使用 Scala Future 进行并发执行。这是代码:
import scala.concurrent.{Await, Future, blocking}
import scala.concurrent.ExecutionContext.Implicits.global
val f1 = Future{myDF.filter("pmod(idx, 8) = 1").count}
val f2 = Future{myDF.filter("pmod(idx, 8) = 2").count}
val f3 = Future{myDF.filter("pmod(idx, 8) = 3").count}
val future = for {c1 <- f1; c2 <- f2; c3 <- f3} yield {
c1 + c2 + c3
}
val summ = Await.result(future, 180 second)
每个过滤器/计数操作的运行时间大约需要 7 秒。但是,运行多次后,并发执行的总时间总是在 35 秒左右,而不是我预期的 7 秒。我对这种行为困惑了很长一段时间,但无法弄清楚。
我有一个由 3 台机器组成的集群,一个主节点,两个工作节点,每个节点有 128G 内存和 32 个内核。数据大小约为3G。我注意到在并发执行期间,一个工作节点有 20 秒的 GC 时间。我已经调整了 GC,使得单个过滤器/计数操作几乎没有 GC 时间。我不确定为什么每当我运行 3 个 Future 的并发执行时 GC 就会启动,以及是否是导致并发执行时间更长的原因。
有人在这个问题上有经验吗?