10

我已经使用多个线程很长时间了,但无法解释这样一个简单的案例。

import java.util.concurrent.Executors
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

def addOne(x: Int) = Future(x + 1)
def addTwo(x: Int) = Future {addOne(x + 1)}

addTwo(1)
// res5: Future[Future[Int]] = Future(Success(Future(Success(3))))

令我惊讶的是,它有效。我不知道为什么。

问题:
为什么给定一个线程可以同时执行两个 Future?

我的期望
第一个FutureaddTwo)占用唯一的线程(newFixedThreadPool(1)),然后它调用另一个FutureaddOne),它再次需要另一个线程。
所以程序最终应该会因线程不足而陷入困境。

4

2 回答 2

10

您的代码正常工作的原因是两个期货将由同一个线程执行。您正在创建的ExecutionContext不会Thread直接为每个使用,Future而是会安排Runnable要执行的任务(实例)。如果池中没有更多线程可用,这些任务将被置于BlockingQueue等待执行的状态。(详见ThreadPoolExecutor API

如果您查看实现,Executors.newFixedThreadPool(1)您会发现它创建了一个具有无界队列的 Executor:

new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])

要获得您正在寻找的线程饥饿的效果,您可以自己创建一个具有有限队列的执行程序:

 implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L, 
                     TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))

由于 的最小容量ArrayBlockingQueue为 1,您需要三个期货才能达到限制,并且您还需要添加一些代码以在未来的结果上执行,以防止它们完成(在下面的示例中,我这样做是通过添加.map(identity)

下面的例子

import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L, 
                      TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))

def addOne(x: Int) = Future {
  x + 1
}
def addTwo(x: Int) = Future {
  addOne(x + 1) .map(identity)
}
def addThree(x: Int) = Future {
  addTwo(x + 1).map(identity)
}

println(addThree(1))

失败了

java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@65a264b6 rejected from java.util.concurrent.ThreadPoolExecutor@10d078f4[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 1]
于 2019-07-01T05:45:42.770 回答
3

扩展它Promise很容易理解

val p1 = Promise[Future[Int]]
ec.execute(() => {
  // the fist task is start run
  val p2 = Promise[Int]
  //the second task is submit , but no run
  ec.execute(() => {
    p2.complete(Success(1))
    println(s"task 2 -> p1:${p1},p2:${p2}")
  })
  //here the p1 is completed, not wait p2.future finish
  p1.complete(Success(p2.future))
  println(s"task 1 -> p1:${p1},p2:${p2}")// you can see the p1 is completed but the p2 have not
  //first task is finish, will run second task
})
val result: Future[Future[Int]] = p1.future

Thread.sleep(1000)
println(result)
于 2019-07-01T05:28:18.467 回答