3

我正在尝试使用 Scala Futures 同时运行 50 多个任务。不幸的是,虽然我只让他们一次运行 2 个。有人能告诉我我做错了什么或如何增加并行度吗?

import ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration._

object Test {
  def main(args: Array[String]) {
    def go() = {
      val list = Seq(
          Future { println("start 1"); Thread.sleep(1000); println("stop 1")}, 
          Future { println("start 2"); Thread.sleep(2000); println("stop 2")},
          Future { println("start 3"); Thread.sleep(3000); println("stop 3")},
          Future { println("start 4"); Thread.sleep(4000); println("stop 4")},
          Future { println("start 5"); Thread.sleep(5000); println("stop 5")}
      )
      Future.sequence(list)
    }
    Await.result(go, Duration.Inf)
  }
}

我得到的结果是

start 1
start 2  
stop 1  
start 3  
stop 2  
start 4  
stop 3  
start 5  
stop 4  
stop 5

为什么我没有得到以下信息?

start 1
start 2
start 3
start 4
start 5
stop 1
stop 2
stop 3
stop 4
stop 5
4

2 回答 2

9

作为并行执行结构,Scala Future 需要某种 ExecutionContext 支持它;通常,这个 ExecutionContext 有一个线程池,它可以从中提取来执行你未来的逻辑。

使 ExecutionEnvironment 可用的最常见方法是将隐式定义的线程池带入作用域,以便将其传递给 Future 构造逻辑。这是通过导入默认定义来完成的,如下所示:

import scala.concurrent.ExecutionContext.implicits.global

正如您将在上面链接的 api 文档中看到的那样,此默认线程池将可用线程数设置为处理器内核数,即

parallelism level == Runtime.availableProcessors
于 2015-10-22T21:41:58.193 回答
1

知道了!谢谢你们俩

import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration._

object Test {
  def main(args: Array[String]) {
    val executorService  = Executors.newFixedThreadPool(1000)
    val executionContext = ExecutionContext.fromExecutorService(executorService)

    def go(implicit ec: ExecutionContext) = {
      val list = Seq(
          Future { println("start 1"); Thread.sleep(1000); println("stop 1")}, 
          Future { println("start 2"); Thread.sleep(2000); println("stop 2")},
          Future { println("start 3"); Thread.sleep(3000); println("stop 3")},
          Future { println("start 4"); Thread.sleep(4000); println("stop 4")},
          Future { println("start 5"); Thread.sleep(5000); println("stop 5")}
      )
      Future.sequence(list)
    }
    Await.result(go(executionContext), Duration.Inf)
  }
}
于 2015-10-22T22:42:41.600 回答