I recently came across some odd behavior on a machine where map
ing a function that returns Future[T]
was executing sequentially. This same problem does not occur on other machines: work is interleaved as one would expect. I later discovered that this was likely because Scala was being a little too smart and choosing an ExecutionContext
that matched the machine's resources: one core, one worker.
Here's some simple code that reproduces the problem:
import scala.concurrent._
import scala.concurrent.duration._
val ss = List("the", "quick", "brown", "fox",
"jumped", "over", "the", "lazy", "dog")
def r(s: String) : Future[String] = future {
for (i <- 1 to 10) yield {
println(String.format("r(%s) waiting %s more seconds...",
s, (10 - i).toString))
Thread.sleep(1000)
}
s.reverse
}
val f_revs = ss.map { r(_) }
println("Look ma, no blocking!")
val rev = f_revs.par.map { Await.result(_, Duration.Inf) }.mkString(" ")
println(rev)
Running this on the machine exhibiting the strange behavior produces sequential output like this:
Look ma, no blocking!
r(the) waiting 9 more seconds...
r(the) waiting 8 more seconds...
r(the) waiting 7 more seconds...
Providing a custom ExecutionContext
:
val pool = Executors.newFixedThreadPool(1)
implicit val ec = ExecutionContext.fromExecutor(pool)
allows the threads to interleave on this machine. But I have a new problem now: the thread pool does not shutdown, causing the program to hang indefinitely. Apparently, this is the expected behavior for FixedThreadPool
s, and I can shut it down by putting pool.shutdown()
somewhere.
Call me stubborn, but I don't want to have to tell the thread pool to shutdown. Is there a way to configure the pool to shutdown when all of the queues are empty (possibly after some delay) just like it works with the default pool? I've looked through the ExecutionContext
documentation, but I'm not finding what I'm looking for.