4

来自 node.js 背景,我是 Scala 的新手,我尝试使用 Twitter 的 Future.collect 来执行一些简单的并发操作。但是我的代码显示的是顺序行为而不是并发行为。我究竟做错了什么?

这是我的代码,

import com.twitter.util.Future

def waitForSeconds(seconds: Int, container:String): Future[String]  = Future[String] {
  Thread.sleep(seconds*1000)
  println(container + ": done waiting for " + seconds + " seconds")
  container + " :done waiting for " + seconds + " seconds"
}

def mainFunction:String = {
  val allTasks = Future.collect(Seq(waitForSeconds(1, "All"), waitForSeconds(3, "All"), waitForSeconds(2, "All")))
  val singleTask = waitForSeconds(1, "Single")

  allTasks onSuccess  { res =>
    println("All tasks succeeded with result " + res)
  }

  singleTask onSuccess { res =>
    println("Single task succeeded with result " + res)
  }

  "Function Complete"
}

println(mainFunction)

这是我得到的输出,

All: done waiting for 1 seconds
All: done waiting for 3 seconds
All: done waiting for 2 seconds
Single: done waiting for 1 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete

我期望的输出是,

All: done waiting for 1 seconds
Single: done waiting for 1 seconds
All: done waiting for 2 seconds
All: done waiting for 3 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete
4

1 回答 1

8

Twitter 的 future 比 Scala 标准库的 future 更明确地说明了计算的执行位置。特别是,Future.apply将安全地捕获异常(如s.c.Future),但它没有说明计算将在哪个线程中运行。在您的情况下,计算在主线程中运行,这就是您看到结果的原因重看。

与标准库的未来 API 相比,这种方法有几个优点。一方面,它使方法签名更简单,因为没有ExecutionContext必须在任何地方传递的隐式。更重要的是,它可以更轻松地避免上下文切换(这是 Brian Degenhardt 的经典解释)。在这方面,TwitterFuture更像 Scalaz Task,并且具有基本相同的性能优势(例如,在此博客文章中进行了描述)。

更明确计算运行位置的缺点是您必须更明确计算运行位置。在您的情况下,您可以编写如下内容:

import com.twitter.util.{ Future, FuturePool }

val pool = FuturePool.unboundedPool

def waitForSeconds(seconds: Int, container:String): Future[String] = pool {
  Thread.sleep(seconds*1000)
  println(container + ": done waiting for " + seconds + " seconds")
  container + " :done waiting for " + seconds + " seconds"
}

这不会完全产生您要求的输出(“功能完成”将首先打印,并且allTasks不会singleTask相互排序),但它将在单独的线程上并行运行任务。

(作为脚注:FuturePool.unboundedPool在我上面的示例中,这是为演示创建未来池的一种简单方法,并且通常很好,但它不适合 CPU 密集型计算 -有关FuturePool其他创建方法,请参阅API 文档未来的池将使用ExecutorService您提供的并且可以自己管理的池。)

于 2016-12-02T14:33:25.233 回答