4

I recently came across some odd behavior on a machine where maping a function that returns Future[T] was executing sequentially. This same problem does not occur on other machines: work is interleaved as one would expect. I later discovered that this was likely because Scala was being a little too smart and choosing an ExecutionContext that matched the machine's resources: one core, one worker.

Here's some simple code that reproduces the problem:

import scala.concurrent._
import scala.concurrent.duration._

val ss = List("the", "quick", "brown", "fox",
              "jumped", "over", "the", "lazy", "dog")

def r(s: String) : Future[String] = future {
        for (i <- 1 to 10) yield {
                println(String.format("r(%s) waiting %s more seconds...",
                        s, (10 - i).toString))
                Thread.sleep(1000)

        }
        s.reverse
}

val f_revs = ss.map { r(_) }

println("Look ma, no blocking!")

val rev = f_revs.par.map { Await.result(_, Duration.Inf) }.mkString(" ")

println(rev)

Running this on the machine exhibiting the strange behavior produces sequential output like this:

Look ma, no blocking!
r(the) waiting 9 more seconds...
r(the) waiting 8 more seconds...
r(the) waiting 7 more seconds...

Providing a custom ExecutionContext:

val pool = Executors.newFixedThreadPool(1)
implicit val ec = ExecutionContext.fromExecutor(pool)

allows the threads to interleave on this machine. But I have a new problem now: the thread pool does not shutdown, causing the program to hang indefinitely. Apparently, this is the expected behavior for FixedThreadPools, and I can shut it down by putting pool.shutdown() somewhere.

Call me stubborn, but I don't want to have to tell the thread pool to shutdown. Is there a way to configure the pool to shutdown when all of the queues are empty (possibly after some delay) just like it works with the default pool? I've looked through the ExecutionContext documentation, but I'm not finding what I'm looking for.

4

2 回答 2

2

Scala 使用自己的 fork-join 实现,其行为与 Java 不同,因此默认行为ExecutionContext与您使用Executors.

一种更简单的方法是设置以下系统属性来配置默认值ExecutionContext

  1. scala.concurrent.context.minThreads施加最小数量的线程。默认为1.
  2. scala.concurrent.context.numThreads设置线程数。默认为x1.
  3. scala.concurrent.context.maxThreads施加最大线程数。默认为x1.

其中的每一个都可以是一个数字,也可以是一个以 开头的数字x,以表示处理器数量的倍数。要增加线程数,您必须同时更改numThreadsmaxThreads。在您的情况下,将两者都设置为x2应该可以。

于 2014-06-11T19:15:00.373 回答
1

看起来 Java 7 有一些额外ExecutorService的 s,特别是一个ForkJoinPool可以做我想要的(即,不需要shutdown()池)。

将池更改为以下内容足以实现我想要的:

val pool = new java.util.concurrent.ForkJoinPool(5)

Java 8 显然有更多的服务

于 2014-06-11T19:02:15.480 回答